00001
00008 #include <EDDSourceLib/inc/EDDMsgLoop.h>
00009 #include <EDDSourceLib/inc/EDDDataSource.h>
00010 #include <EDDSourceLib/inc/EDDThreadManager.h>
00011 #include <EDDSourceLib/inc/EDDDataPipe.h>
00012 #include <EDDSourceLib/inc/EDDLockManager.h>
00013 #include <proto/EDDMsgHdr.h>
00014 #include <proto/EDDSourceMsg.h>
00015
00016
00017 tcEDDMsgLoop* tcEDDMsgLoop::gpMsgLoop = NULL;
00018
00026 tcEDDMsgLoop*
00027 tcEDDMsgLoop::GetInstance(void)
00028 {
00029 if (NULL == gpMsgLoop)
00030 {
00031 gpMsgLoop = new tcEDDMsgLoop();
00032 }
00033 return gpMsgLoop;
00034 }
00035
00045 int
00046 tcEDDMsgLoop::AddDataPipe(tcEDDDataPipe* apDataPipe)
00047 {
00048 int rv = -1;
00049 mpLockManager->TakeLock(mhTableLock);
00050 for (int index = 0; index < gnMaxDataPipes; index++)
00051 {
00052 if (NULL == maDataPipes[index])
00053 {
00054 maDataPipes[index] = apDataPipe;
00055 rv = index;
00056 break;
00057 }
00058 }
00059
00060 if (rv >= 0)
00061 {
00062
00063 maShutdownThread[rv] = false;
00064 maRcvThreadHandles[rv] =
00065 mpThreadManager->StartThread(DispatchReceiverThread,
00066 tcEDDThreadManager::eeMedium,
00067 (void*)rv);
00068 }
00069 mpLockManager->ReleaseLock(mhTableLock);
00070 return rv;
00071 }
00072
00073 int
00074 tcEDDMsgLoop::RemoveDataPipe(int anPipeID)
00075 {
00076 int rv = -1;
00077 mpLockManager->TakeLock(mhTableLock);
00078 if (NULL != maDataPipes[anPipeID])
00079 {
00080
00081 maShutdownThread[anPipeID] = true;
00082
00083 mpThreadManager->StopThread(maRcvThreadHandles[anPipeID]);
00084
00085
00086 for (int index = 0; index < gnMaxDataSources; index++)
00087 {
00088 maSubscriptionTable[anPipeID][index] = 0;
00089 }
00090 maDataPipes[anPipeID] = 0;
00091 rv = 0;
00092 }
00093 mpLockManager->ReleaseLock(mhTableLock);
00094 return rv;
00095 }
00096
00097 int
00098 tcEDDMsgLoop::AddDataSource(tcEDDDataSource* apDataSource)
00099 {
00100 int rv = -1;
00101 mpLockManager->TakeLock(mhTableLock);
00102 for (int index = 0; index < gnMaxDataSources; index++)
00103 {
00104 if (NULL == maDataSources[index])
00105 {
00106 maDataSources[index] = apDataSource;
00107 rv = index;
00108 break;
00109 }
00110 }
00111 mpLockManager->ReleaseLock(mhTableLock);
00112 return rv;
00113 }
00114
00115 int
00116 tcEDDMsgLoop::RemoveDataSource(int anSourceID)
00117 {
00118 int rv = -1;
00119 mpLockManager->TakeLock(mhTableLock);
00120 if (NULL != maDataSources[anSourceID])
00121 {
00122 for (int index = 0; index < gnMaxDataPipes; index++)
00123 {
00124 maSubscriptionTable[index][anSourceID] = 0;
00125 }
00126 maDataSources[anSourceID] = NULL;
00127 rv = 0;
00128 }
00129 mpLockManager->ReleaseLock(mhTableLock);
00130 return rv;
00131 }
00132
00133 int
00134 tcEDDMsgLoop::SendMsg(tsEDDMsgHdr* apMsg, bool BrdCast)
00135 {
00136 int rv = 0;
00137 mpLockManager->TakeLock(mhTableLock);
00138 for (int index = 0; index < gnMaxDataPipes; index++)
00139 {
00140 if ( (maSubscriptionTable[index][apMsg->mnDataSource] ) ||
00141 (BrdCast && (NULL != maDataPipes[index])) )
00142 {
00143 apMsg->mnSeqNo = maDataPipes[index]->NextSeqNo();
00144 rv |= maDataPipes[index]->SendMsg((void*)apMsg,apMsg->mnMsgLen);
00145 }
00146 }
00147 mpLockManager->ReleaseLock(mhTableLock);
00148 return rv;
00149 }
00150
00151 bool
00152 tcEDDMsgLoop::IsSubscribed(uint16_t anSourceId)
00153 {
00154 bool rv = false;
00155 mpLockManager->TakeLock(mhTableLock);
00156 for (int index = 0; index < gnMaxDataPipes; index++)
00157 {
00158 if ( maSubscriptionTable[index][anSourceId] )
00159 {
00160 rv = true;
00161 break;
00162 }
00163 }
00164 mpLockManager->ReleaseLock(mhTableLock);
00165 return rv;
00166 }
00167
00168 int
00169 tcEDDMsgLoop::GetMaxMessageLen(int anSourceID)
00170 {
00171 int MaxLen = -1;
00172 mpLockManager->TakeLock(mhTableLock);
00173 for (int index = 0; index < gnMaxDataPipes; index++)
00174 {
00175 if (maSubscriptionTable[index][anSourceID])
00176 {
00177 int ml = maDataPipes[index]->GetMaxMessageLen();
00178 if ((0 > ml) && (ml > MaxLen))
00179 {
00180 MaxLen = ml;
00181 }
00182 }
00183 }
00184 mpLockManager->ReleaseLock(mhTableLock);
00185 return MaxLen;
00186 }
00187
00188 int
00189 tcEDDMsgLoop::DispatchReceiverThread(void* anPipeID, void* apFoo, void* apGoo)
00190 {
00191 unsigned int rv = 0;
00192
00193 rv = GetInstance()->ReceiverThread((int)anPipeID);
00194 return rv;
00195 }
00196
00197 int
00198 tcEDDMsgLoop::ReceiverThread(int anPipeID)
00199 {
00200 unsigned int rv = 0;
00201 int bufpos = 0;
00202 char* buffer;
00203 teParseState state = eeWaitSync;
00204 int readlen = 4;
00205 tsEDDMsgHdr* lpMsg;
00206
00207 int buflen = maDataPipes[anPipeID]->GetMaxMessageLen();
00208 if (buflen < 8)
00209 {
00210 buflen = 4096;
00211 }
00212 buffer = new char[buflen];
00213 lpMsg = (tsEDDMsgHdr*)buffer;
00214
00215
00216 while(1)
00217 {
00218
00219 readlen = (readlen > buflen) ? buflen : readlen;
00220 mpLockManager->TakeLock(mhTableLock);
00221 if (maShutdownThread[anPipeID])
00222 {
00223 mpLockManager->ReleaseLock(mhTableLock);
00224 break;
00225 }
00226 int bytes = maDataPipes[anPipeID]->GetData(&buffer[bufpos],readlen,0);
00227 mpLockManager->ReleaseLock(mhTableLock);
00228
00229 if (bytes > 0)
00230 {
00231 bool done = false;
00232 while(!done)
00233 {
00234 switch(state)
00235 {
00236 case eeGetHeader:
00237
00238 if (bufpos+bytes >= sizeof(tsEDDMsgHdr))
00239 {
00240 state = eeGetBytes;
00241 }
00242 else
00243 {
00244 readlen = sizeof(tsEDDMsgHdr)-(bufpos+bytes);
00245 done = true;
00246 }
00247 break;
00248 case eeGetBytes:
00249
00250 if (lpMsg->mnMsgLen <= bufpos+bytes)
00251 {
00252
00253 HandleMessage(lpMsg, anPipeID);
00254
00255
00256 int temp = lpMsg->mnMsgLen;
00257 for (int i = 0; i < bufpos+bytes-temp; i++)
00258 {
00259 buffer[i] = buffer[i+temp];
00260 }
00261
00262 bytes = bufpos+bytes-temp;
00263 bufpos = 0;
00264
00265
00266 state = eeWaitSync;
00267 }
00268 else
00269 {
00270 readlen = lpMsg->mnMsgLen-(bufpos+bytes);
00271 done = true;
00272 }
00273 break;
00274 case eeWaitSync:
00275 default:
00276
00277 if (bufpos+bytes >= 4)
00278 {
00279 uint32_t temp1 = buffer[0];
00280 uint32_t temp2 = buffer[1]<<8;
00281 uint32_t temp3 = buffer[2]<<16;
00282 uint32_t temp4 = buffer[3]<<24;
00283 uint32_t check = temp1|temp2|temp3|temp4;
00284 if (EDD_MSG_SYNC == check)
00285 {
00286 state = eeGetHeader;
00287 }
00288 else
00289 {
00290
00291 for(int i = 1; i < bytes+bufpos; i++)
00292 {
00293 buffer[i-1] = buffer[i];
00294 }
00295 bytes = bytes-1;
00296 }
00297 }
00298 else
00299 {
00300 readlen = 4-(bufpos+bytes);
00301 done = true;
00302 }
00303 break;
00304 }
00305 bufpos += bytes;
00306 bytes = 0;
00307 }
00308 }
00309 else if (bytes < 0)
00310 {
00311
00312 break;
00313 }
00314 else if (bytes == 0)
00315 {
00316 mpThreadManager->Sleep(10, anPipeID);
00317 }
00318 }
00319 delete [] buffer;
00320 return rv;
00321 }
00322
00323 tcEDDMsgLoop::tcEDDMsgLoop(void)
00324 : mpThreadManager(NULL)
00325 , mpLockManager(NULL)
00326 {
00327 int index = 0;
00328 InitOSDependencies();
00329
00330 mhTableLock = mpLockManager->CreateLock();
00331
00332 mpLockManager->TakeLock(mhTableLock);
00333 for (index = 0; index < gnMaxDataPipes; index++)
00334 {
00335 maDataPipes[index] = NULL;
00336 for (int j = 0; j < gnMaxDataSources; j++)
00337 {
00338 maSubscriptionTable[index][j] = 0;
00339 }
00340 }
00341 for (index = 0; index < gnMaxDataSources; index++)
00342 {
00343 maDataSources[index] = NULL;
00344 }
00345 mpLockManager->ReleaseLock(mhTableLock);
00346 }
00347
00348 tcEDDMsgLoop::~tcEDDMsgLoop(void)
00349 {
00350
00351 }
00352
00353 void
00354 tcEDDMsgLoop::HandleMessage(tsEDDMsgHdr* apMsg, int anPipeID)
00355 {
00356
00357 mpLockManager->TakeLock(mhTableLock);
00358
00359 if (apMsg->mnAllSources)
00360 {
00361 for (int index = 0; index < gnMaxDataSources; index++)
00362 {
00363 if ( maDataSources[index] != NULL)
00364 {
00365 maDataSources[index]->HandleMsg(apMsg);
00366 }
00367 }
00368 }
00369 else
00370 {
00371
00372 if (apMsg->mnMsgId == EDDMSGID_SOURCEDATAREQMSG)
00373 {
00374 tsEDDSourceDataReqMsg* lpDataReqMsg = (tsEDDSourceDataReqMsg*)apMsg;
00375 if ( (apMsg->mnDataSource < gnMaxDataSources) &&
00376 (maDataSources[apMsg->mnDataSource] != NULL) )
00377 {
00378 maSubscriptionTable[anPipeID][apMsg->mnDataSource] = lpDataReqMsg->mnStart;
00379 }
00380 }
00381 else if ( (apMsg->mnDataSource < gnMaxDataSources) &&
00382 (maDataSources[apMsg->mnDataSource] != NULL) )
00383 {
00384 maDataSources[apMsg->mnDataSource]->HandleMsg(apMsg);
00385 }
00386 }
00387 mpLockManager->ReleaseLock(mhTableLock);
00388 }