OpenTracker

An Open Architecture for Reconfigurable Tracking based on XML | Contact

NetworkSinkModule.cxx

Go to the documentation of this file.
00001 /* ========================================================================
00002  * Copyright (c) 2006,
00003  * Institute for Computer Graphics and Vision
00004  * Graz University of Technology
00005  * All rights reserved.
00006  *
00007  * Redistribution and use in source and binary forms, with or without
00008  * modification, are permitted provided that the following conditions are
00009  * met:
00010  *
00011  * Redistributions of source code must retain the above copyright notice,
00012  * this list of conditions and the following disclaimer.
00013  *
00014  * Redistributions in binary form must reproduce the above copyright
00015  * notice, this list of conditions and the following disclaimer in the
00016  * documentation and/or other materials provided with the distribution.
00017  *
00018  * Neither the name of the Graz University of Technology nor the names of
00019  * its contributors may be used to endorse or promote products derived from
00020  * this software without specific prior written permission.
00021  *
00022  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00023  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00024  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00025  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00026  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00027  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00028  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00029  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00030  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00031  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00032  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00033  * ========================================================================
00034  * PROJECT: OpenTracker
00035  * ======================================================================== */
00042 /* ======================================================================== */
00043 
00044 // a trick to avoid warnings when ace includes the STL headers
00045 #ifdef WIN32
00046 #pragma warning(disable:4786)
00047 #endif
00048 #include <stdlib.h>
00049 #include <string>
00050 #include <algorithm>
00051 
00052 #include <ace/ACE.h>
00053 #include <ace/INET_Addr.h>
00054 #include <ace/SOCK_Dgram_Mcast.h>
00055 #include <ace/Time_Value.h>
00056 #include <ace/Unbounded_Set.h>
00057 #include <ace/Thread.h>
00058 #include <ace/Thread_Mutex.h>
00059 #include <ace/Guard_T.h>
00060 
00061 #include "NetworkSinkModule.h"
00062 
00063 #include <iostream>
00064 
00065 #include <ace/Log_Msg.h>
00066 #include "../tool/OT_ACE_Log.h"
00067 
00068 
00069 #ifndef OT_NO_NETWORK_SUPPORT
00070 
00071 
00072 namespace ot {
00073 
00074     // definitions for the Network Data protocol
00075     const int positionQuaternion=1;
00076     const int positionAngles=2;
00077     const int positionMatrix=3;
00078 
00079     const int magicNum=0xbeef;
00080     const int revNum=0x0200;
00081 
00087     struct NetworkSender {
00088         FlexibleTrackerDataRecord data;
00089         char *dataBuffer;
00090         char *nextRecord;
00091 
00092         NetworkSender( const FlexibleTrackerDataRecord & data_ ) :
00093             data( data_ )
00094         {
00095         }
00096     };
00097 
00098     struct UdpSender: NetworkSender {
00099         unsigned short port;
00100         std::string nic;
00101         ACE_SOCK_Dgram socket;
00102 
00103         UdpSender( const FlexibleTrackerDataRecord & data_, unsigned short port_, const std::string & nic_ ) :
00104             NetworkSender( data_ ), port( port_ ), nic( nic_ )
00105         {}
00106     };
00107 
00108     struct MulticastSender: UdpSender {
00109         std::string group;
00110         ACE_INET_Addr address;
00111 
00112         MulticastSender( const FlexibleTrackerDataRecord & data_, const std::string & group_, unsigned short port_, const std::string & nic_ ) :
00113             UdpSender( data_, port_, nic_ ), group( group_ ), address( port, group.c_str() )
00114         {}
00115     };
00116 
00117     struct UnicastSender: UdpSender {
00118         ACE_Thread_Mutex mutex;
00119         int stop;
00120         ACE_Unbounded_Set<ACE_INET_Addr> addresses;
00121 
00122         UnicastSender( const FlexibleTrackerDataRecord & data_, unsigned short port_, const std::string & nic_ ) :
00123             UdpSender( data_, port_, nic_ ), stop(0)
00124         {}
00125     };
00126 
00128     struct FindMulticastSender {
00129         std::string group;
00130         unsigned short port;
00131         std::string nic;
00132 
00133         FindMulticastSender( const std::string & group_, unsigned short & port_, const std::string & nic_ ) :
00134             group( group_ ), port( port_), nic( nic_ )
00135         {}
00136 
00137         bool operator()( const MulticastSender * other )
00138         {
00139             return (group.compare( other->group ) == 0
00140                     && port == other->port
00141                     && nic.compare( other->nic ) == 0 );
00142         }
00143     };
00144 
00146     struct FindUnicastSender {
00147         unsigned short port;
00148         std::string nic;
00149 
00150         FindUnicastSender( unsigned short & port_, const std::string & nic_ ) :
00151             port( port_), nic( nic_ )
00152         {}
00153 
00154         bool operator()( const UnicastSender * other )
00155         {
00156             return (    port == other->port
00157                         && nic.compare( other->nic ) == 0 );
00158         }
00159     };
00160 
00161     NetworkSinkModule::NetworkSinkModule() : Module(), NodeFactory()
00162     {
00163     }
00164 
00165     // destructor cleans up any allocated memory
00166     NetworkSinkModule::~NetworkSinkModule()
00167     {
00168         for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00169         {
00170             delete (*mc_it);
00171         }
00172         for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00173         {
00174             delete (*uc_it);
00175         }
00176     }
00177 
00178     // initializes ConsoleModule
00179     void NetworkSinkModule::init(StringTable& attributes,  ConfigNode * localTree)
00180     {
00181         Module::init( attributes, localTree );
00182         if( attributes.containsKey("name"))
00183         {
00184             serverName = attributes.get("name");
00185         }
00186         else
00187         {
00188             serverName = "OpenTracker";
00189         }
00190     }
00191 
00192     // This method is called to construct a new Node.
00193     Node * NetworkSinkModule::createNode( const std::string& name,  StringTable& attributes)
00194     {
00195         if( name.compare("NetworkSink") == 0 )
00196         {
00197             // initialize Network data buffer
00198             FlexibleTrackerDataRecord data;
00199             data.headerId = htons(magicNum);
00200             data.revNum = htons(revNum);
00201             data.maxStationNum = 0;
00202             data.numOfStations = 0;
00203 
00204             std::string name = attributes.get("name");
00205             int number;
00206             unsigned short port;
00207             int num = sscanf(attributes.get("number").c_str(), " %i", &number );
00208             if( num == 0 ){
00209                 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error in converting NetworkSink number !\n")));
00210                 return NULL;
00211             }
00212             num = sscanf(attributes.get("port").c_str(), " %hu", &port );
00213             if( num == 0 ){
00214                 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error in converting NetworkSink port number !\n")));
00215                 return NULL;
00216             }
00217 
00218             std::string nic = attributes.get("interface");
00219             if( attributes.get("mode").compare("multicast") == 0 )
00220             {
00221                 std::string group = attributes.get("multicast-address");
00222 
00223                 MulticastSenderVector::iterator it = std::find_if( multicasts.begin(), multicasts.end(), FindMulticastSender( group, port, nic ));
00224                 MulticastSender * multicastData;
00225                 if( multicasts.end() == it )
00226                 {
00227                     multicastData = new MulticastSender( data, group, port, nic );
00228                     multicasts.push_back( multicastData );
00229                 } else
00230                 {
00231                     multicastData = *it;
00232                 }
00233                 // increase maximum of station numbers to suit the given number
00234                 multicastData->data.maxStationNum = (multicastData->data.maxStationNum < number) ? (number) : (multicastData->data.maxStationNum);
00235 
00236                 NetworkSink * sink = new NetworkSink( name, number, multicastData );
00237                 nodes.push_back( sink );
00238                 LOG_ACE_INFO("ot:Built NetworkSink node %s .\n", name.c_str());
00239                 return sink;
00240             }
00241             if( attributes.get("mode").compare("unicast") == 0 )
00242             {
00243                 UnicastSenderVector::iterator it = std::find_if( unicasts.begin(), unicasts.end(), FindUnicastSender( port, nic ));
00244                 UnicastSender * unicastData;
00245                 if( unicasts.end() == it )
00246                 {
00247                     unicastData = new UnicastSender( data, port, nic );
00248                     unicasts.push_back( unicastData );
00249                 } else
00250                 {
00251                     unicastData = *it;
00252                 }
00253                 // increase maximum of station numbers to suit the given number
00254                 unicastData->data.maxStationNum = (unicastData->data.maxStationNum < number)?(number):(unicastData->data.maxStationNum);
00255 
00256                 NetworkSink * sink = new NetworkSink( name, number, unicastData );
00257                 nodes.push_back( sink );
00258                 LOG_ACE_INFO("ot:Built NetworkSink node %s .\n", name.c_str());
00259                 return sink;
00260             }
00261         }
00262         return NULL;
00263     }
00264 
00265     // opens the network socket to use
00266     void NetworkSinkModule::start()
00267     {
00268         // only open a network connection if we actually have something to do
00269         if( nodes.size() > 0 )
00270         {
00271             // sets maxStationNum to network byte order
00272             for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00273             {
00274                 if( (*mc_it)->socket.open(ACE_Addr::sap_any) == -1 )
00275                 {
00276                     ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error opening socket in NetworkSinkModule !\n")));
00277                     exit(1);
00278                 }
00279                 if((*mc_it)->nic.compare("") != 0 )
00280                 {
00281                     (*mc_it)->socket.set_nic(ACE_TEXT_CHAR_TO_TCHAR((*mc_it)->nic.c_str()));
00282                 }
00283                 (*mc_it)->data.maxStationNum = htons((*mc_it)->data.maxStationNum);
00284             }
00285             for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00286             {
00287                 if( (*uc_it)->socket.open(ACE_INET_Addr((*uc_it)->port)) == -1 )
00288                 {
00289                     ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error opening socket in NetworkSinkModule !\n")));
00290                     exit(1);
00291                 }
00292                 if((*uc_it)->nic.compare("") != 0 )
00293                 {
00294                     (*uc_it)->socket.set_nic(ACE_TEXT_CHAR_TO_TCHAR((*uc_it)->nic.c_str()));
00295                 }
00296                 (*uc_it)->data.maxStationNum = htons((*uc_it)->data.maxStationNum);
00297                 ACE_Thread::spawn((ACE_THR_FUNC)NetworkSinkModule::runUnicastTransceiver, *uc_it );
00298             }
00299         }
00300         Module::start();
00301     }
00302 
00303     // closes the network connection
00304     void NetworkSinkModule::close()
00305     {
00306         for( MulticastSenderVector::iterator mc_it = multicasts.begin() ; mc_it != multicasts.end(); ++mc_it )
00307         {
00308             if( (*mc_it)->socket.close() == -1 )
00309             {
00310                 ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error closing socket in NetworkSinkModule !\n")));
00311             }
00312         }
00313         for( UnicastSenderVector::iterator uc_it = unicasts.begin() ; uc_it != unicasts.end(); ++uc_it )
00314         {
00315             ACE_Guard<ACE_Thread_Mutex> guard( (*uc_it)->mutex );
00316             (*uc_it)->stop = 1;
00317         }
00318     }
00319 
00320     // checks the NetworkSink nodes and sends any new data to the network
00321     void NetworkSinkModule::pullEvent()
00322     {
00323         if( nodes.size() == 0 )
00324             return;
00325 
00326         // clear the network buffers
00327         MulticastSenderVector::iterator mc_it;
00328         for( mc_it = multicasts.begin(); mc_it != multicasts.end(); ++mc_it )
00329         {
00330             (*mc_it)->data.numOfStations = 0;
00331             (*mc_it)->data.bufferLength = 0;
00332         }
00333         UnicastSenderVector::iterator uc_it;
00334         for( uc_it = unicasts.begin(); uc_it != unicasts.end(); ++uc_it )
00335         {
00336             (*uc_it)->data.numOfStations = 0;
00337             (*uc_it)->data.bufferLength = 0;
00338         }
00339 
00340         // for each NetworkSink node: copy data to sink's buffer
00341         unsigned int i;
00342         char** sinkBuffers = (char**)malloc(nodes.size() * sizeof(char*));
00343         short* sinkBufferLengths = (short*)malloc(nodes.size() * sizeof(short));
00344 
00345         for (i = 0; i < nodes.size(); i++)
00346         {
00347             NetworkSink *sink = nodes[i];
00348             sinkBufferLengths[i] = 0;
00349 
00350             if( sink->modified == 1 )
00351             {
00352                 const std::string &eventStr = sink->event.serialize();
00353                 short si[3];
00354                 char *index;
00355 
00356                 // sink buffer length
00357                 sinkBufferLengths[i] = 3 * sizeof(short)
00358                     + sink->stationName.length() + eventStr.length() + 2*sizeof(char);
00359 
00360                 // header
00361                 si[0] = htons(sink->stationNumber);
00362                 si[1] = htons(sinkBufferLengths[i]);
00363                 si[2] = htons((short)sink->stationName.length());
00364 
00365                 // copy data to this sink's buffer
00366                 sinkBuffers[i] = (char*)malloc(sinkBufferLengths[i]);
00367                 index = sinkBuffers[i];
00368 
00369                 memcpy(index, &si, 3 * sizeof(short));
00370                 index += 3 * sizeof(short);
00371                 memcpy(index, sink->stationName.c_str(), sink->stationName.length()+1);
00372                 index += sink->stationName.length()+1;
00373                 memcpy(index, eventStr.c_str(), eventStr.length()+1);
00374                 //std::cerr << "sbl: " << sinkBufferLengths[i] << " - " << (int)((char*)(sinkBuffers[i]) - (char*)(index+eventStr.length()+1)) << std::endl;
00375 
00376                 // add to bufferLength and numOfStations
00377                 sink->networkSender->data.bufferLength += sinkBufferLengths[i];
00378                 sink->networkSender->data.numOfStations += 1;
00379             }
00380         }
00381 
00382         // allocate memory for each networkSender
00383         for (mc_it = multicasts.begin(); mc_it != multicasts.end(); ++mc_it)
00384         {
00385             (*mc_it)->dataBuffer = (char*)malloc((*mc_it)->data.bufferLength);
00386             (*mc_it)->nextRecord = (*mc_it)->dataBuffer;
00387         }
00388         for (uc_it = unicasts.begin(); uc_it != unicasts.end(); uc_it++)
00389         {
00390             (*uc_it)->dataBuffer = (char*)malloc((*uc_it)->data.bufferLength);
00391             (*uc_it)->nextRecord = (*uc_it)->dataBuffer;
00392         }
00393 
00394         // add data of each network sink node to according networkSender
00395         for (i = 0; i < nodes.size(); i++)
00396         {
00397             NetworkSink *sink = nodes[i];
00398             if (sink->modified == 1)
00399             {
00400                 memcpy(sink->networkSender->nextRecord, sinkBuffers[i], sinkBufferLengths[i]);
00401                 sink->networkSender->nextRecord += sinkBufferLengths[i];
00402                 free(sinkBuffers[i]);
00403             }
00404             sink->modified = 0;
00405         }
00406         free(sinkBuffers);
00407         free(sinkBufferLengths);
00408 
00409         // send any non-empty network data buffers
00410         for( mc_it = multicasts.begin(); mc_it != multicasts.end(); mc_it++ )
00411         {
00412             if( (*mc_it)->data.numOfStations > 0 )
00413             {
00414                 (*mc_it)->data.numOfStations = htons( (*mc_it)->data.numOfStations );
00415                 (*mc_it)->data.bufferLength = htons( (*mc_it)->data.bufferLength );
00416                 short headerLength = sizeof(short) * 5;
00417                 short bufferLength = ntohs((*mc_it)->data.bufferLength);
00418 
00419                 // copy header and data to the same address
00420                 short sendBufferSize = headerLength + bufferLength;
00421                 char *sendBuffer = (char*)malloc(sendBufferSize);
00422                 memcpy(sendBuffer, &(*mc_it)->data, headerLength);
00423                 memcpy(sendBuffer + headerLength, (*mc_it)->dataBuffer, bufferLength);
00424 
00425                 // send without blocking to avoid stalls in the mainloop, packet is thrown away !
00426                 if ((*mc_it)->socket.send(sendBuffer, sendBufferSize, (*mc_it)->address, 0, &ACE_Time_Value::zero) < 0)
00427                 {
00428                     LOG_ACE_ERROR("ot:NetworkSinkModule : Error sending packet for %s:%hu\n", (*mc_it)->address.get_host_name(), (*mc_it)->address.get_port_number());
00429                 }
00430                 free(sendBuffer);
00431             }
00432             free((*mc_it)->dataBuffer);
00433         }
00434         for( uc_it = unicasts.begin(); uc_it != unicasts.end(); uc_it++ )
00435         {
00436             if( (*uc_it)->data.numOfStations > 0 )
00437             {
00438                 ACE_Guard<ACE_Thread_Mutex> guard( (*uc_it)->mutex );
00439                 (*uc_it)->data.numOfStations = htons( (*uc_it)->data.numOfStations );
00440                 (*uc_it)->data.bufferLength = htons( (*uc_it)->data.bufferLength );
00441                 short headerLength = sizeof(short) * 5;
00442                 short bufferLength = ntohs((*uc_it)->data.bufferLength);
00443 
00444                 // copy header and data to the same address
00445                 short sendBufferSize = headerLength + bufferLength;
00446                 char *sendBuffer = (char*)malloc(sendBufferSize);
00447                 memcpy(sendBuffer, &(*uc_it)->data, headerLength);
00448                 memcpy(sendBuffer + headerLength, (*uc_it)->dataBuffer, bufferLength);
00449 
00450                 // send without blocking to avoid stalls in the mainloop, packet is thrown away !
00451                 for( ACE_Unbounded_Set_Iterator<ACE_INET_Addr> it = (*uc_it)->addresses.begin() ; ! it.done(); it.advance() )
00452                     if( (*uc_it)->socket.send(sendBuffer, sendBufferSize, *it, 0, &ACE_Time_Value::zero ) < 0 )
00453                     {
00454                         LOG_ACE_ERROR("ot:NetworkSinkModule : Error sending packet for %s:%hu\n", (*it).get_host_name(), (*it).get_port_number());
00455                     }
00456                 free(sendBuffer);
00457             }
00458             free((*uc_it)->dataBuffer);
00459         }
00460     }
00461 
00462     // Converts num floats to network byte order
00463 
00464     void NetworkSinkModule::convertFloatsHToNl(std::vector<float>& floats, float* result, int num)
00465     {
00466         int i;
00467         union
00468         {
00469             float f;
00470             long int l;
00471         } convert;
00472 
00473         for (i=0; i<num; i++)
00474         {
00475             convert.f = floats[i];
00476             convert.l = htonl(convert.l);    // Convert host to network byte order
00477             result[i] = convert.f;
00478         }
00479     }
00480 
00481     void NetworkSinkModule::runUnicastTransceiver( void * data )
00482     {
00483         UnicastSender * uc = (UnicastSender *) data;
00484         ACE_INET_Addr remoteAddr;
00485         ACE_Time_Value timeOut( 1, 0 );
00486         int retval;
00487         char command;
00488         while(1)
00489         {
00490             do
00491             {
00492                 if((retval = uc->socket.recv( &command, sizeof( command ), remoteAddr, 0,
00493                                               &timeOut )) == -1 )
00494                 {
00495                     if( errno != ETIME && errno != 0 )
00496                     {
00497                         ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error %d receiving data !\n"), errno));
00498                         exit( -1 );
00499                     }
00500                 }
00501             } while( retval < 0 && uc->stop == 0);
00502             if( uc->stop != 0 )
00503                 break;
00504             {
00505                 ACE_Guard<ACE_Thread_Mutex> guard( uc->mutex );
00506                 const char poll = 'P';
00507                 const char leave = 'L';
00508                 switch (command)
00509                 {
00510                     case poll:
00511                         if( uc->addresses.find( remoteAddr ) )
00512                             uc->addresses.insert( remoteAddr );
00513                         break;
00514                     case leave:
00515                         if( !uc->addresses.find( remoteAddr ) )
00516                             uc->addresses.remove( remoteAddr );
00517                 }
00518             }
00519         }
00520         if( uc->socket.close() == -1)
00521         {
00522             ACE_DEBUG((LM_ERROR, ACE_TEXT("ot:Error closing socket in NetworkSinkModule !\n")));
00523         }
00524         ACE_DEBUG((LM_INFO, ACE_TEXT("ot:Stopping thread\n")));
00525     }
00526 
00527 } // namespace ot
00528 
00529 
00530 
00531 #else
00532 #pragma message(">>> OT_NO_NETWORK_SUPPORT")
00533 #endif // OT_NO_NETWORK_SUPPORT
00534 
00535 /* 
00536  * ------------------------------------------------------------
00537  *   End of NetworkSinkModule.cxx
00538  * ------------------------------------------------------------
00539  *   Automatic Emacs configuration follows.
00540  *   Local Variables:
00541  *   mode:c++
00542  *   c-basic-offset: 4
00543  *   eval: (c-set-offset 'substatement-open 0)
00544  *   eval: (c-set-offset 'case-label '+)
00545  *   eval: (c-set-offset 'statement 'c-lineup-runin-statements)
00546  *   eval: (setq indent-tabs-mode nil)
00547  *   End:
00548  * ------------------------------------------------------------ 
00549  */

copyright (c) 2006 Graz University of Technology