00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00042
00043
00044
00045 #ifdef WIN32
00046 #pragma warning(disable:4786)
00047 #endif
00048 #include <stdlib.h>
00049 #include <string>
00050 #include <algorithm>
00051
00052 #include <ace/ACE.h>
00053 #include <ace/INET_Addr.h>
00054 #include <ace/SOCK_Dgram_Mcast.h>
00055 #include <ace/Time_Value.h>
00056 #include <ace/Unbounded_Set.h>
00057 #include <ace/Thread.h>
00058 #include <ace/Thread_Mutex.h>
00059 #include <ace/Guard_T.h>
00060
00061 #include "NetworkSinkModule.h"
00062
00063 #include <iostream>
00064
00065 #include <ace/Log_Msg.h>
00066 #include "../tool/OT_ACE_Log.h"
00067
00068
00069 #ifndef OT_NO_NETWORK_SUPPORT
00070
00071
00072 namespace ot {
00073
00074
00075 const int positionQuaternion=1;
00076 const int positionAngles=2;
00077 const int positionMatrix=3;
00078
00079 const int magicNum=0xbeef;
00080 const int revNum=0x0200;
00081
00087 struct NetworkSender {
00088 FlexibleTrackerDataRecord data;
00089 char *dataBuffer;
00090 char *nextRecord;
00091
00092 NetworkSender( const FlexibleTrackerDataRecord & data_ ) :
00093 data( data_ )
00094 {
00095 }
00096 };
00097
00098 struct UdpSender: NetworkSender {
00099 unsigned short port;
00100 std::string nic;
00101 ACE_SOCK_Dgram socket;
00102
00103 UdpSender( const FlexibleTrackerDataRecord & data_, unsigned short port_, const std::string & nic_ ) :
00104 NetworkSender( data_ ), port( port_ ), nic( nic_ )
00105 {}
00106 };
00107
00108 struct MulticastSender: UdpSender {
00109 std::string group;
00110 ACE_INET_Addr address;
00111
00112 MulticastSender( const FlexibleTrackerDataRecord & data_, const std::string & group_, unsigned short port_, const std::string & nic_ ) :
00113 UdpSender( data_, port_, nic_ ), group( group_ ), address( port, group.c_str() )
00114 {}
00115 };
00116
00117 struct UnicastSender: UdpSender {
00118 ACE_Thread_Mutex mutex;
00119 int stop;
00120 ACE_Unbounded_Set<ACE_INET_Addr> addresses;
00121
00122 UnicastSender( const FlexibleTrackerDataRecord & data_, unsigned short port_, const std::string & nic_ ) :
00123 UdpSender( data_, port_, nic_ ), stop(0)
00124 {}
00125 };
00126
00128 struct FindMulticastSender {
00129 std::string group;
00130 unsigned short port;
00131 std::string nic;
00132
00133 FindMulticastSender( const std::string & group_, unsigned short & port_, const std::string & nic_ ) :
00134 group( group_ ), port( port_), nic( nic_ )
00135 {}
00136
00137 bool operator()( const MulticastSender * other )
00138 {
00139 return (group.compare( other->group ) == 0
00140 && port == other->port
00141 && nic.compare( other->nic ) == 0 );
00142 }
00143 };
00144
00146 struct FindUnicastSender {
00147 unsigned short port;
00148 std::string nic;
00149
00150 FindUnicastSender( unsigned short & port_, const std::string & nic_ ) :
00151 port( port_), nic( nic_ )
00152 {}
00153
00154 bool operator()( const UnicastSender * other )
00155 {
00156 return ( port == other->port
00157 && nic.compare( other->nic ) == 0 );
00158 }
00159 };
00160
00161 NetworkSinkModule::NetworkSinkModule() : Module(), NodeFactory()
00162 {
00163 }
00164
00165
00166 NetworkSinkModule::~NetworkSinkModule()
00167 {
00168 for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00169 {
00170 delete (*mc_it);
00171 }
00172 for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00173 {
00174 delete (*uc_it);
00175 }
00176 }
00177
00178
00179 void NetworkSinkModule::init(StringTable& attributes, ConfigNode * localTree)
00180 {
00181 Module::init( attributes, localTree );
00182 if( attributes.containsKey("name"))
00183 {
00184 serverName = attributes.get("name");
00185 }
00186 else
00187 {
00188 serverName = "OpenTracker";
00189 }
00190 }
00191
00192
00193 Node * NetworkSinkModule::createNode( const std::string& name, StringTable& attributes)
00194 {
00195 if( name.compare("NetworkSink") == 0 )
00196 {
00197
00198 FlexibleTrackerDataRecord data;
00199 data.headerId = htons(magicNum);
00200 data.revNum = htons(revNum);
00201 data.maxStationNum = 0;
00202 data.numOfStations = 0;
00203
00204 std::string name = attributes.get("name");
00205 int number;
00206 unsigned short port;
00207 int num = sscanf(attributes.get("number").c_str(), " %i", &number );
00208 if( num == 0 ){
00209 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error in converting NetworkSink number !\n")));
00210 return NULL;
00211 }
00212 num = sscanf(attributes.get("port").c_str(), " %hu", &port );
00213 if( num == 0 ){
00214 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error in converting NetworkSink port number !\n")));
00215 return NULL;
00216 }
00217
00218 std::string nic = attributes.get("interface");
00219 if( attributes.get("mode").compare("multicast") == 0 )
00220 {
00221 std::string group = attributes.get("multicast-address");
00222
00223 MulticastSenderVector::iterator it = std::find_if( multicasts.begin(), multicasts.end(), FindMulticastSender( group, port, nic ));
00224 MulticastSender * multicastData;
00225 if( multicasts.end() == it )
00226 {
00227 multicastData = new MulticastSender( data, group, port, nic );
00228 multicasts.push_back( multicastData );
00229 } else
00230 {
00231 multicastData = *it;
00232 }
00233
00234 multicastData->data.maxStationNum = (multicastData->data.maxStationNum < number) ? (number) : (multicastData->data.maxStationNum);
00235
00236 NetworkSink * sink = new NetworkSink( name, number, multicastData );
00237 nodes.push_back( sink );
00238 LOG_ACE_INFO("ot:Built NetworkSink node %s .\n", name.c_str());
00239 return sink;
00240 }
00241 if( attributes.get("mode").compare("unicast") == 0 )
00242 {
00243 UnicastSenderVector::iterator it = std::find_if( unicasts.begin(), unicasts.end(), FindUnicastSender( port, nic ));
00244 UnicastSender * unicastData;
00245 if( unicasts.end() == it )
00246 {
00247 unicastData = new UnicastSender( data, port, nic );
00248 unicasts.push_back( unicastData );
00249 } else
00250 {
00251 unicastData = *it;
00252 }
00253
00254 unicastData->data.maxStationNum = (unicastData->data.maxStationNum < number)?(number):(unicastData->data.maxStationNum);
00255
00256 NetworkSink * sink = new NetworkSink( name, number, unicastData );
00257 nodes.push_back( sink );
00258 LOG_ACE_INFO("ot:Built NetworkSink node %s .\n", name.c_str());
00259 return sink;
00260 }
00261 }
00262 return NULL;
00263 }
00264
00265
00266 void NetworkSinkModule::start()
00267 {
00268
00269 if( nodes.size() > 0 )
00270 {
00271
00272 for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00273 {
00274 if( (*mc_it)->socket.open(ACE_Addr::sap_any) == -1 )
00275 {
00276 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error opening socket in NetworkSinkModule !\n")));
00277 exit(1);
00278 }
00279 if((*mc_it)->nic.compare("") != 0 )
00280 {
00281 (*mc_it)->socket.set_nic(ACE_TEXT_CHAR_TO_TCHAR((*mc_it)->nic.c_str()));
00282 }
00283 (*mc_it)->data.maxStationNum = htons((*mc_it)->data.maxStationNum);
00284 }
00285 for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00286 {
00287 if( (*uc_it)->socket.open(ACE_INET_Addr((*uc_it)->port)) == -1 )
00288 {
00289 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error opening socket in NetworkSinkModule !\n")));
00290 exit(1);
00291 }
00292 if((*uc_it)->nic.compare("") != 0 )
00293 {
00294 (*uc_it)->socket.set_nic(ACE_TEXT_CHAR_TO_TCHAR((*uc_it)->nic.c_str()));
00295 }
00296 (*uc_it)->data.maxStationNum = htons((*uc_it)->data.maxStationNum);
00297 ACE_Thread::spawn((ACE_THR_FUNC)NetworkSinkModule::runUnicastTransceiver, *uc_it );
00298 }
00299 }
00300 Module::start();
00301 }
00302
00303
00304 void NetworkSinkModule::close()
00305 {
00306 for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00307 {
00308 if( (*mc_it)->socket.close() == -1 )
00309 {
00310 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error closing socket in NetworkSinkModule !\n")));
00311 }
00312 }
00313 for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00314 {
00315 ACE_Guard<ACE_Thread_Mutex> guard( (*uc_it)->mutex );
00316 (*uc_it)->stop = 1;
00317 }
00318 }
00319
00320
00321 void NetworkSinkModule::pullEvent()
00322 {
00323 if( nodes.size() == 0 )
00324 return;
00325
00326
00327 MulticastSenderVector::iterator mc_it;
00328 for( mc_it = multicasts.begin(); mc_it != multicasts.end(); ++mc_it )
00329 {
00330 (*mc_it)->data.numOfStations = 0;
00331 (*mc_it)->data.bufferLength = 0;
00332 }
00333 UnicastSenderVector::iterator uc_it;
00334 for( uc_it = unicasts.begin(); uc_it != unicasts.end(); ++uc_it )
00335 {
00336 (*uc_it)->data.numOfStations = 0;
00337 (*uc_it)->data.bufferLength = 0;
00338 }
00339
00340
00341 unsigned int i;
00342 char** sinkBuffers = (char**)malloc(nodes.size() * sizeof(char*));
00343 short* sinkBufferLengths = (short*)malloc(nodes.size() * sizeof(short));
00344
00345 for (i = 0; i < nodes.size(); i++)
00346 {
00347 NetworkSink *sink = nodes[i];
00348 sinkBufferLengths[i] = 0;
00349
00350 if( sink->modified == 1 )
00351 {
00352 const std::string &eventStr = sink->event.serialize();
00353 short si[3];
00354 char *index;
00355
00356
00357 sinkBufferLengths[i] = 3 * sizeof(short)
00358 + sink->stationName.length() + eventStr.length() + 2*sizeof(char);
00359
00360
00361 si[0] = htons(sink->stationNumber);
00362 si[1] = htons(sinkBufferLengths[i]);
00363 si[2] = htons((short)sink->stationName.length());
00364
00365
00366 sinkBuffers[i] = (char*)malloc(sinkBufferLengths[i]);
00367 index = sinkBuffers[i];
00368
00369 memcpy(index, &si, 3 * sizeof(short));
00370 index += 3 * sizeof(short);
00371 memcpy(index, sink->stationName.c_str(), sink->stationName.length()+1);
00372 index += sink->stationName.length()+1;
00373 memcpy(index, eventStr.c_str(), eventStr.length()+1);
00374
00375
00376
00377 sink->networkSender->data.bufferLength += sinkBufferLengths[i];
00378 sink->networkSender->data.numOfStations += 1;
00379 }
00380 }
00381
00382
00383 for (mc_it = multicasts.begin(); mc_it != multicasts.end(); ++mc_it)
00384 {
00385 (*mc_it)->dataBuffer = (char*)malloc((*mc_it)->data.bufferLength);
00386 (*mc_it)->nextRecord = (*mc_it)->dataBuffer;
00387 }
00388 for (uc_it = unicasts.begin(); uc_it != unicasts.end(); uc_it++)
00389 {
00390 (*uc_it)->dataBuffer = (char*)malloc((*uc_it)->data.bufferLength);
00391 (*uc_it)->nextRecord = (*uc_it)->dataBuffer;
00392 }
00393
00394
00395 for (i = 0; i < nodes.size(); i++)
00396 {
00397 NetworkSink *sink = nodes[i];
00398 if (sink->modified == 1)
00399 {
00400 memcpy(sink->networkSender->nextRecord, sinkBuffers[i], sinkBufferLengths[i]);
00401 sink->networkSender->nextRecord += sinkBufferLengths[i];
00402 free(sinkBuffers[i]);
00403 }
00404 sink->modified = 0;
00405 }
00406 free(sinkBuffers);
00407 free(sinkBufferLengths);
00408
00409
00410 for( mc_it = multicasts.begin(); mc_it != multicasts.end(); mc_it++ )
00411 {
00412 if( (*mc_it)->data.numOfStations > 0 )
00413 {
00414 (*mc_it)->data.numOfStations = htons( (*mc_it)->data.numOfStations );
00415 (*mc_it)->data.bufferLength = htons( (*mc_it)->data.bufferLength );
00416 short headerLength = sizeof(short) * 5;
00417 short bufferLength = ntohs((*mc_it)->data.bufferLength);
00418
00419
00420 short sendBufferSize = headerLength + bufferLength;
00421 char *sendBuffer = (char*)malloc(sendBufferSize);
00422 memcpy(sendBuffer, &(*mc_it)->data, headerLength);
00423 memcpy(sendBuffer + headerLength, (*mc_it)->dataBuffer, bufferLength);
00424
00425
00426 if ((*mc_it)->socket.send(sendBuffer, sendBufferSize, (*mc_it)->address, 0, &ACE_Time_Value::zero) < 0)
00427 {
00428 LOG_ACE_ERROR("ot:NetworkSinkModule : Error sending packet for %s:%hu\n", (*mc_it)->address.get_host_name(), (*mc_it)->address.get_port_number());
00429 }
00430 free(sendBuffer);
00431 }
00432 free((*mc_it)->dataBuffer);
00433 }
00434 for( uc_it = unicasts.begin(); uc_it != unicasts.end(); uc_it++ )
00435 {
00436 if( (*uc_it)->data.numOfStations > 0 )
00437 {
00438 ACE_Guard<ACE_Thread_Mutex> guard( (*uc_it)->mutex );
00439 (*uc_it)->data.numOfStations = htons( (*uc_it)->data.numOfStations );
00440 (*uc_it)->data.bufferLength = htons( (*uc_it)->data.bufferLength );
00441 short headerLength = sizeof(short) * 5;
00442 short bufferLength = ntohs((*uc_it)->data.bufferLength);
00443
00444
00445 short sendBufferSize = headerLength + bufferLength;
00446 char *sendBuffer = (char*)malloc(sendBufferSize);
00447 memcpy(sendBuffer, &(*uc_it)->data, headerLength);
00448 memcpy(sendBuffer + headerLength, (*uc_it)->dataBuffer, bufferLength);
00449
00450
00451 for( ACE_Unbounded_Set_Iterator<ACE_INET_Addr> it = (*uc_it)->addresses.begin() ; ! it.done(); it.advance() )
00452 if( (*uc_it)->socket.send(sendBuffer, sendBufferSize, *it, 0, &ACE_Time_Value::zero ) < 0 )
00453 {
00454 LOG_ACE_ERROR("ot:NetworkSinkModule : Error sending packet for %s:%hu\n", (*it).get_host_name(), (*it).get_port_number());
00455 }
00456 free(sendBuffer);
00457 }
00458 free((*uc_it)->dataBuffer);
00459 }
00460 }
00461
00462
00463
00464 void NetworkSinkModule::convertFloatsHToNl(std::vector<float>& floats, float* result, int num)
00465 {
00466 int i;
00467 union
00468 {
00469 float f;
00470 long int l;
00471 } convert;
00472
00473 for (i=0; i<num; i++)
00474 {
00475 convert.f = floats[i];
00476 convert.l = htonl(convert.l);
00477 result[i] = convert.f;
00478 }
00479 }
00480
00481 void NetworkSinkModule::runUnicastTransceiver( void * data )
00482 {
00483 UnicastSender * uc = (UnicastSender *) data;
00484 ACE_INET_Addr remoteAddr;
00485 ACE_Time_Value timeOut( 1, 0 );
00486 int retval;
00487 char command;
00488 while(1)
00489 {
00490 do
00491 {
00492 if((retval = uc->socket.recv( &command, sizeof( command ), remoteAddr, 0,
00493 &timeOut )) == -1 )
00494 {
00495 if( errno != ETIME && errno != 0 )
00496 {
00497 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error %d receiving data !\n"), errno));
00498 exit( -1 );
00499 }
00500 }
00501 } while( retval < 0 && uc->stop == 0);
00502 if( uc->stop != 0 )
00503 break;
00504 {
00505 ACE_Guard<ACE_Thread_Mutex> guard( uc->mutex );
00506 const char poll = 'P';
00507 const char leave = 'L';
00508 switch (command)
00509 {
00510 case poll:
00511 if( uc->addresses.find( remoteAddr ) )
00512 uc->addresses.insert( remoteAddr );
00513 break;
00514 case leave:
00515 if( !uc->addresses.find( remoteAddr ) )
00516 uc->addresses.remove( remoteAddr );
00517 }
00518 }
00519 }
00520 if( uc->socket.close() == -1)
00521 {
00522 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error closing socket in NetworkSinkModule !\n")));
00523 }
00524 ACE_DEBUG((LM_INFO, ACE_TEXT("ot:Stopping thread\n")));
00525 }
00526
00527 }
00528
00529
00530
00531 #else
00532 #pragma message(">>> OT_NO_NETWORK_SUPPORT")
00533 #endif // OT_NO_NETWORK_SUPPORT
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549