00001 
00008 #include <EDDSourceLib/inc/EDDMsgLoop.h>
00009 #include <EDDSourceLib/inc/EDDDataSource.h>
00010 #include <EDDSourceLib/inc/EDDThreadManager.h>
00011 #include <EDDSourceLib/inc/EDDDataPipe.h>
00012 #include <EDDSourceLib/inc/EDDLockManager.h>
00013 #include <proto/EDDMsgHdr.h>
00014 #include <proto/EDDSourceMsg.h>
00015 
00016 
00017 tcEDDMsgLoop* tcEDDMsgLoop::gpMsgLoop = NULL;
00018 
00026 tcEDDMsgLoop* 
00027 tcEDDMsgLoop::GetInstance(void)
00028 {
00029    if (NULL == gpMsgLoop)
00030    {
00031       gpMsgLoop = new tcEDDMsgLoop();
00032    }
00033    return gpMsgLoop;
00034 }
00035 
00045 int 
00046 tcEDDMsgLoop::AddDataPipe(tcEDDDataPipe* apDataPipe)
00047 {
00048     int rv = -1;
00049     mpLockManager->TakeLock(mhTableLock);
00050     for (int index = 0; index < gnMaxDataPipes; index++)
00051     {
00052         if (NULL == maDataPipes[index])
00053         {
00054             maDataPipes[index] = apDataPipe;
00055             rv = index;
00056             break;
00057         }
00058     }
00059     
00060     if (rv >= 0)
00061     {
00062         
00063         maShutdownThread[rv] = false;
00064         maRcvThreadHandles[rv] =
00065             mpThreadManager->StartThread(DispatchReceiverThread,
00066                                          tcEDDThreadManager::eeMedium,
00067                                          (void*)rv);
00068     }    
00069     mpLockManager->ReleaseLock(mhTableLock);
00070     return rv;
00071 }
00072 
00073 int 
00074 tcEDDMsgLoop::RemoveDataPipe(int anPipeID)
00075 {
00076     int rv = -1;
00077     mpLockManager->TakeLock(mhTableLock);
00078     if (NULL != maDataPipes[anPipeID])
00079     {
00080         
00081         maShutdownThread[anPipeID] = true;
00082         
00083         mpThreadManager->StopThread(maRcvThreadHandles[anPipeID]);
00084         
00085         
00086         for (int index = 0; index < gnMaxDataSources; index++)
00087         {
00088             maSubscriptionTable[anPipeID][index] = 0;
00089         }
00090         maDataPipes[anPipeID] = 0;
00091         rv = 0;
00092     }
00093     mpLockManager->ReleaseLock(mhTableLock);
00094     return rv;
00095 }
00096 
00097 int 
00098 tcEDDMsgLoop::AddDataSource(tcEDDDataSource* apDataSource)
00099 {
00100     int rv = -1;
00101     mpLockManager->TakeLock(mhTableLock);
00102     for (int index = 0; index < gnMaxDataSources; index++)
00103     {
00104         if (NULL == maDataSources[index])
00105         {
00106             maDataSources[index] = apDataSource;
00107             rv = index;
00108             break;
00109         }
00110     }
00111     mpLockManager->ReleaseLock(mhTableLock);
00112     return rv;
00113 }
00114 
00115 int 
00116 tcEDDMsgLoop::RemoveDataSource(int anSourceID)
00117 {
00118     int rv = -1;
00119     mpLockManager->TakeLock(mhTableLock);
00120     if (NULL != maDataSources[anSourceID])
00121     {
00122         for (int index = 0; index < gnMaxDataPipes; index++)
00123         {
00124             maSubscriptionTable[index][anSourceID] = 0;
00125         }
00126         maDataSources[anSourceID] = NULL;
00127         rv = 0;
00128     }
00129     mpLockManager->ReleaseLock(mhTableLock);
00130     return rv;
00131 }
00132     
00133 int 
00134 tcEDDMsgLoop::SendMsg(tsEDDMsgHdr* apMsg, bool BrdCast)
00135 {
00136     int rv = 0;
00137     mpLockManager->TakeLock(mhTableLock);
00138     for (int index = 0; index < gnMaxDataPipes; index++)
00139     {
00140         if ( (maSubscriptionTable[index][apMsg->mnDataSource] ) ||
00141              (BrdCast && (NULL != maDataPipes[index])) )
00142         {
00143             apMsg->mnSeqNo = maDataPipes[index]->NextSeqNo();
00144             rv |= maDataPipes[index]->SendMsg((void*)apMsg,apMsg->mnMsgLen);
00145         }
00146     }
00147     mpLockManager->ReleaseLock(mhTableLock);
00148     return rv;
00149 }
00150 
00151 bool 
00152 tcEDDMsgLoop::IsSubscribed(uint16_t anSourceId)
00153 {
00154     bool rv = false;
00155     mpLockManager->TakeLock(mhTableLock);
00156     for (int index = 0; index < gnMaxDataPipes; index++)
00157     {
00158         if ( maSubscriptionTable[index][anSourceId] )
00159         {
00160             rv = true;
00161             break;
00162         }
00163     }
00164     mpLockManager->ReleaseLock(mhTableLock);
00165     return rv;
00166 }
00167 
00168 int 
00169 tcEDDMsgLoop::GetMaxMessageLen(int anSourceID)
00170 {
00171     int MaxLen = -1;
00172     mpLockManager->TakeLock(mhTableLock);
00173     for (int index = 0; index < gnMaxDataPipes; index++)
00174     {
00175         if (maSubscriptionTable[index][anSourceID])
00176         {
00177             int ml = maDataPipes[index]->GetMaxMessageLen();
00178             if ((0 > ml) && (ml > MaxLen))
00179             {
00180                 MaxLen = ml;
00181             }
00182         }
00183     }
00184     mpLockManager->ReleaseLock(mhTableLock);
00185     return MaxLen;
00186 }
00187     
00188 int 
00189 tcEDDMsgLoop::DispatchReceiverThread(void* anPipeID, void* apFoo, void* apGoo)
00190 {
00191     unsigned int rv = 0;
00192     
00193     rv = GetInstance()->ReceiverThread((int)anPipeID);
00194     return rv;
00195 }
00196 
00197 int 
00198 tcEDDMsgLoop::ReceiverThread(int anPipeID)
00199 {
00200     unsigned int rv = 0;
00201     int          bufpos = 0;
00202     char*        buffer;
00203     teParseState state = eeWaitSync;
00204     int          readlen = 4;
00205     tsEDDMsgHdr* lpMsg;
00206     
00207     int buflen = maDataPipes[anPipeID]->GetMaxMessageLen();
00208     if (buflen < 8)
00209     {
00210         buflen = 4096;
00211     }
00212     buffer = new char[buflen];
00213     lpMsg = (tsEDDMsgHdr*)buffer;
00214     
00215     
00216     while(1)
00217     {
00218         
00219                  readlen = (readlen > buflen) ? buflen : readlen;
00220         mpLockManager->TakeLock(mhTableLock);
00221         if (maShutdownThread[anPipeID])
00222         {
00223             mpLockManager->ReleaseLock(mhTableLock);
00224             break;
00225         }
00226         int bytes = maDataPipes[anPipeID]->GetData(&buffer[bufpos],readlen,0);
00227         mpLockManager->ReleaseLock(mhTableLock);
00228         
00229         if (bytes > 0)
00230         {
00231             bool done = false;
00232             while(!done)
00233             {
00234                 switch(state)
00235                 {
00236                 case eeGetHeader:
00237                     
00238                     if (bufpos+bytes >= sizeof(tsEDDMsgHdr))
00239                     {
00240                         state = eeGetBytes;
00241                     }
00242                     else
00243                     {
00244                         readlen = sizeof(tsEDDMsgHdr)-(bufpos+bytes);
00245                         done = true;
00246                     }
00247                     break;
00248                 case eeGetBytes:    
00249                     
00250                     if (lpMsg->mnMsgLen <= bufpos+bytes)
00251                     {
00252                         
00253                         HandleMessage(lpMsg, anPipeID);
00254                         
00255                         
00256                         int temp = lpMsg->mnMsgLen;
00257                         for (int i = 0; i < bufpos+bytes-temp; i++)
00258                         {
00259                             buffer[i] = buffer[i+temp];
00260                         } 
00261                         
00262                         bytes  = bufpos+bytes-temp;
00263                         bufpos = 0;
00264                         
00265                         
00266                         state = eeWaitSync;
00267                     }
00268                     else  
00269                     {
00270                         readlen = lpMsg->mnMsgLen-(bufpos+bytes);
00271                         done = true;
00272                     }
00273                     break;
00274                 case eeWaitSync:
00275                 default:
00276                     
00277                     if (bufpos+bytes >= 4)
00278                     {
00279                         uint32_t temp1 = buffer[0];
00280                         uint32_t temp2 = buffer[1]<<8;
00281                         uint32_t temp3 = buffer[2]<<16;
00282                         uint32_t temp4 = buffer[3]<<24;
00283                         uint32_t check = temp1|temp2|temp3|temp4;
00284                         if (EDD_MSG_SYNC == check)
00285                         {
00286                            state = eeGetHeader;
00287                         }
00288                         else
00289                         {
00290                            
00291                            for(int i = 1; i < bytes+bufpos; i++)
00292                            {
00293                               buffer[i-1] = buffer[i];
00294                            }
00295                            bytes = bytes-1;
00296                         }
00297                     }
00298                     else 
00299                     {
00300                         readlen = 4-(bufpos+bytes);
00301                         done = true;
00302                     }
00303                     break;
00304                 }
00305                 bufpos += bytes;
00306                 bytes = 0;
00307             } 
00308         }
00309         else if (bytes < 0) 
00310         {
00311             
00312             break;
00313         }
00314         else if (bytes == 0)
00315         {
00316            mpThreadManager->Sleep(10, anPipeID);
00317         }
00318     }
00319     delete [] buffer;
00320     return rv;
00321 }
00322         
00323 tcEDDMsgLoop::tcEDDMsgLoop(void)
00324 : mpThreadManager(NULL)
00325 , mpLockManager(NULL)
00326 {
00327     int index = 0;
00328     InitOSDependencies();
00329     
00330     mhTableLock = mpLockManager->CreateLock();
00331     
00332     mpLockManager->TakeLock(mhTableLock);
00333     for (index = 0; index < gnMaxDataPipes; index++)
00334     {
00335         maDataPipes[index] = NULL;
00336         for (int j = 0; j < gnMaxDataSources; j++)
00337         {
00338             maSubscriptionTable[index][j] = 0;
00339         }
00340     }
00341     for (index = 0; index < gnMaxDataSources; index++)
00342     {
00343         maDataSources[index] = NULL;
00344     }
00345     mpLockManager->ReleaseLock(mhTableLock);
00346 }
00347 
00348 tcEDDMsgLoop::~tcEDDMsgLoop(void)
00349 {
00350     
00351 }
00352 
00353 void
00354 tcEDDMsgLoop::HandleMessage(tsEDDMsgHdr* apMsg, int anPipeID)
00355 {
00356     
00357     mpLockManager->TakeLock(mhTableLock);
00358     
00359     if (apMsg->mnAllSources) 
00360     {
00361         for (int index = 0; index < gnMaxDataSources; index++)
00362         {
00363             if ( maDataSources[index] != NULL)
00364             {
00365                 maDataSources[index]->HandleMsg(apMsg);
00366             }            
00367         }
00368     }
00369     else 
00370     {
00371         
00372         if (apMsg->mnMsgId == EDDMSGID_SOURCEDATAREQMSG)
00373         {
00374             tsEDDSourceDataReqMsg* lpDataReqMsg = (tsEDDSourceDataReqMsg*)apMsg;
00375             if ( (apMsg->mnDataSource < gnMaxDataSources) &&
00376                  (maDataSources[apMsg->mnDataSource] != NULL) )
00377             {
00378                 maSubscriptionTable[anPipeID][apMsg->mnDataSource] = lpDataReqMsg->mnStart;
00379             }
00380         } 
00381         else if ( (apMsg->mnDataSource < gnMaxDataSources) &&
00382              (maDataSources[apMsg->mnDataSource] != NULL) )
00383         {
00384             maDataSources[apMsg->mnDataSource]->HandleMsg(apMsg);
00385         }
00386     }
00387     mpLockManager->ReleaseLock(mhTableLock);    
00388 }