Revision: 7210
          http://playerstage.svn.sourceforge.net/playerstage/?rev=7210&view=rev
Author:   thjc
Date:     2008-12-19 02:50:12 +0000 (Fri, 19 Dec 2008)

Log Message:
-----------
Fixes for:
Player server shutdown logic
Reduced locking time for Filewatcher so other threads get a chance to add 
watches
Unsubscription from localbb was removing random subscriptions

Modified Paths:
--------------
    code/player/trunk/libplayercore/devicetable.cc
    code/player/trunk/libplayercore/devicetable.h
    code/player/trunk/libplayercore/driver.h
    code/player/trunk/libplayercore/error.h
    code/player/trunk/libplayercore/filewatcher.cc
    code/player/trunk/libplayercore/globals.cc
    code/player/trunk/libplayercore/message.cc
    code/player/trunk/libplayercore/threaded_driver.cc
    code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
    code/player/trunk/server/drivers/position/nav200/sicknav200.cc
    code/player/trunk/server/server.cc

Modified: code/player/trunk/libplayercore/devicetable.cc
===================================================================
--- code/player/trunk/libplayercore/devicetable.cc      2008-12-19 02:36:48 UTC 
(rev 7209)
+++ code/player/trunk/libplayercore/devicetable.cc      2008-12-19 02:50:12 UTC 
(rev 7210)
@@ -272,6 +272,29 @@
   return(0);
 }
 
+int
+DeviceTable::StopAlwaysonDrivers()
+{
+  Device* thisentry;
+
+  // We don't lock here, on the assumption that the caller is also the only
+  // thread that can make changes to the device table.
+  for(thisentry=head;thisentry;thisentry=thisentry->next)
+  {
+    if(thisentry->driver->alwayson)
+    {
+         QueuePointer Temp = QueuePointer();
+      if(thisentry->Unsubscribe(Temp) != 0)
+      {
+        PLAYER_ERROR2("alwayson unsubscription failed for device %s:%d",
+                      interf_to_str(thisentry->addr.interf), 
thisentry->addr.index);
+        return(-1);
+      }
+    }
+  }
+  return(0);
+}
+
 // Register a factory creation function.  It will be called when
 // GetDevice fails to find a device in the deviceTable.  This function
 // might, for example, locate the device on a remote host (in a

Modified: code/player/trunk/libplayercore/devicetable.h
===================================================================
--- code/player/trunk/libplayercore/devicetable.h       2008-12-19 02:36:48 UTC 
(rev 7209)
+++ code/player/trunk/libplayercore/devicetable.h       2008-12-19 02:50:12 UTC 
(rev 7210)
@@ -109,6 +109,15 @@
     // than just driver-specific.
     int StartAlwaysonDrivers();
 
+    // Unsubscribe to each device whose driver is marked 'alwayson'.  Returns
+    // 0 on success, -1 on error (at least one driver failed to start).
+    //
+    // Only used for coordinating server shutdown
+    //
+    // TODO: change the semantics of alwayson to be device-specific, rather
+    // than just driver-specific.
+    int StopAlwaysonDrivers();
+
     // Register a factory creation function.  It will be called when
     // GetDevice fails to find a device in the deviceTable.  This function
     // might, for example, locate the device on a remote host (in a

Modified: code/player/trunk/libplayercore/driver.h
===================================================================
--- code/player/trunk/libplayercore/driver.h    2008-12-19 02:36:48 UTC (rev 
7209)
+++ code/player/trunk/libplayercore/driver.h    2008-12-19 02:50:12 UTC (rev 
7210)
@@ -577,6 +577,14 @@
     @returns Returns 0 on success. */
     virtual int Shutdown();
 
+    /** @brief Terminate the driver.
+
+    This method doesnt go through the niceities of unsubscribing etc, only use
+    when the server is actually shutting down as it takes a few shortcuts
+
+    @returns Returns 0 on success. */
+    virtual int Terminate();
+
     /** @brief Main method for driver thread.
 
     drivers have their own thread of execution, created using

Modified: code/player/trunk/libplayercore/error.h
===================================================================
--- code/player/trunk/libplayercore/error.h     2008-12-19 02:36:48 UTC (rev 
7209)
+++ code/player/trunk/libplayercore/error.h     2008-12-19 02:50:12 UTC (rev 
7210)
@@ -66,6 +66,7 @@
 #define PLAYER_ERROR2(msg, a, b)  ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__, 
__LINE__, "error   : " msg "\n", a, b)
 #define PLAYER_ERROR3(msg, a, b, c)  ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__, 
__LINE__,  "error   : " msg "\n", a, b, c)
 #define PLAYER_ERROR4(msg, a, b, c,d)  ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__, 
__LINE__,  "error   : " msg "\n", a, b, c, d)
+#define PLAYER_ERROR5(msg, a, b, c, d, e)  ErrorPrint(PLAYER_ERR_ERR, 0, 
__FILE__, __LINE__,  "error   : " msg "\n", a, b, c, d, e)
 
 /// Warning message macros
 #define PLAYER_WARN(msg)        ErrorPrint(PLAYER_ERR_WARN, 0, __FILE__, 
__LINE__, "warning : " msg "\n")

Modified: code/player/trunk/libplayercore/filewatcher.cc
===================================================================
--- code/player/trunk/libplayercore/filewatcher.cc      2008-12-19 02:36:48 UTC 
(rev 7209)
+++ code/player/trunk/libplayercore/filewatcher.cc      2008-12-19 02:50:12 UTC 
(rev 7210)
@@ -77,20 +77,25 @@
        t.tv_sec = static_cast<int> (floor(Timeout));
        t.tv_usec = static_cast<int> ((Timeout - static_cast<int> 
(floor(Timeout))) * 1e6);
 
+       // unlock for the select call
+       // in the worst case if the list gets modified during the call we will 
either
+       // not be able to match an event on a deleted fd, or will get spurious 
wake ups
+       // on a newly added fd all of which are non fatal
+       Unlock();
        int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t);
 
        if (ret < 0)
        {
                PLAYER_ERROR2("Select called failed in File Watcher: %d 
%s",errno,strerror(errno));
-               Unlock();
                return ret;
        }
        else if (ret == 0)
        {
-               Unlock();
                return 0;
        }
 
+       Lock();
+
        int queueless_count = 0;
        int match_count = 0;
 

Modified: code/player/trunk/libplayercore/globals.cc
===================================================================
--- code/player/trunk/libplayercore/globals.cc  2008-12-19 02:36:48 UTC (rev 
7209)
+++ code/player/trunk/libplayercore/globals.cc  2008-12-19 02:50:12 UTC (rev 
7210)
@@ -74,7 +74,7 @@
 bool player_quit;
 bool player_quiet_startup;
 
-// global access to the cmdlihe arguments
+// global access to the cmdline arguments
 int player_argc;
 char** player_argv;
 

Modified: code/player/trunk/libplayercore/message.cc
===================================================================
--- code/player/trunk/libplayercore/message.cc  2008-12-19 02:36:48 UTC (rev 
7209)
+++ code/player/trunk/libplayercore/message.cc  2008-12-19 02:50:12 UTC (rev 
7210)
@@ -132,7 +132,8 @@
     }
     else
     {
-      PLAYER_ERROR3 ("failed to find clone function for  message %s: %s, %d", 
interf_to_str (Header.addr.interf), msgtype_to_str (Header.type), 
Header.subtype);
+      this->Data = NULL;
+      PLAYER_ERROR5 ("failed to find clone function for  message %s[%d]: 
%s[%d], %d", interf_to_str (Header.addr.interf),Header.addr.interf, 
msgtype_to_str (Header.type), Header.type, Header.subtype);
     }
   }
   else
@@ -529,8 +530,8 @@
   // If it was a response, then mark it , to prompt
   // processing of the queue.
   if(!this->data_requested &&
-     (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
-     (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
+     (hdr->type == PLAYER_MSGTYPE_RESP_ACK ||
+     hdr->type == PLAYER_MSGTYPE_RESP_NACK ))
     this->SetDataRequested(true,true);
 
   this->Unlock();

Modified: code/player/trunk/libplayercore/threaded_driver.cc
===================================================================
--- code/player/trunk/libplayercore/threaded_driver.cc  2008-12-19 02:36:48 UTC 
(rev 7209)
+++ code/player/trunk/libplayercore/threaded_driver.cc  2008-12-19 02:50:12 UTC 
(rev 7210)
@@ -89,6 +89,17 @@
 // destructor, to free up allocated queue.
 ThreadedDriver::~ThreadedDriver()
 {
+       // if our thread is still running request it to be stopped
+       if (ThreadState == PLAYER_THREAD_STATE_RUNNING || ThreadState == 
PLAYER_THREAD_STATE_RESTARTING)
+       {
+               StopThread();
+       }
+       // wait for the thread to actually stop
+       while (ThreadState != PLAYER_THREAD_STATE_STOPPED)
+       {
+               usleep(100000);
+       }
+
        pthread_barrier_destroy(&threadSetupBarrier);
 }
 
@@ -129,6 +140,7 @@
 {
   if (ThreadState == PLAYER_THREAD_STATE_RUNNING)
   {
+       PLAYER_MSG2(5,"Cancelling thread %p belonging to driver 
%p",driverthread,this);
     pthread_cancel(driverthread);
     if(pthread_detach(driverthread))
       perror("ThreadedDriver::StopThread:pthread_detach()");
@@ -224,6 +236,16 @@
        return 0;
 }
 
+int ThreadedDriver::Terminate()
+{
+       int ret = Driver::Terminate();
+       // wait for the thread to actually stop
+       while (ret == 0 && ThreadState != PLAYER_THREAD_STATE_STOPPED)
+       {
+               usleep(100000);
+       }
+       return ret;
+}
 
 bool ThreadedDriver::Wait(double TimeOut)
 {

Modified: code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
===================================================================
--- code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp     
2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp     
2008-12-19 02:50:12 UTC (rev 7210)
@@ -185,7 +185,7 @@
                /** @brief Driver de-initialisation. */
                int Shutdown();
                /** Override the unsubscribe to stop sending events to devices 
which are no longer subscribed. */
-               int Unsubscribe(player_devaddr_t addr);
+               int Unsubscribe(QueuePointer &queue, player_devaddr_t addr);
 
                // MessageHandler
                /** @brief Process incoming messages.
@@ -307,15 +307,11 @@
                * map<group, vector<device queue> >
                */
                map<std::string, vector<QueuePointer> > group_listeners;
-
-               /** Map of queues to devices. Used to remove unneeded queues 
when a device is unsubscribed. */
-               map<player_devaddr_t, QueuePointer> subscribed_devices;
 };
 
////////////////////////////////////////////////////////////////////////////////
 // Override the unsubscribe. Stop sending out events to unsubscribed devices.
-int LocalBB::Unsubscribe(player_devaddr_t addr)
+int LocalBB::Unsubscribe(QueuePointer &qp, player_devaddr_t addr)
 {
-       QueuePointer &qp = subscribed_devices[addr];
        for (map<std::string, map<std::string, vector<QueuePointer> > 
>::iterator itr = listeners.begin(); itr != listeners.end(); itr++)
        {
                map<std::string, vector<QueuePointer> > &keys = (*itr).second;
@@ -354,7 +350,7 @@
                }
        }
        
-       return Driver::Unsubscribe(addr);
+       return Driver::Unsubscribe(qp,addr);
 }
 
////////////////////////////////////////////////////////////////////////////////
 // Factory method.
@@ -676,7 +672,6 @@
 BlackBoardEntry LocalBB::SubscribeKey(const std::string &key, const 
std::string &group, const QueuePointer &resp_queue, const player_devaddr_t addr)
 {
        listeners[group][key].push_back(resp_queue);
-       subscribed_devices[addr] = resp_queue;
        BlackBoardEntry entry = entries[group][key];
        if (entry.key == "")
        {
@@ -707,7 +702,6 @@
 vector<BlackBoardEntry> LocalBB::SubscribeGroup(const std::string &group, 
const QueuePointer &qp, const player_devaddr_t addr)
 {
        group_listeners[group].push_back(qp);
-       subscribed_devices[addr] = qp;
 
        vector<BlackBoardEntry> group_entries;
        //map<group, map<key, entry> >

Modified: code/player/trunk/server/drivers/position/nav200/sicknav200.cc
===================================================================
--- code/player/trunk/server/drivers/position/nav200/sicknav200.cc      
2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/drivers/position/nav200/sicknav200.cc      
2008-12-19 02:50:12 UTC (rev 7210)
@@ -968,7 +968,7 @@
                uint32_t type = *reinterpret_cast<uint32_t*>(wkb + 1);
                bool extendedWKB = type >> 24 == 32; // Extended WKBs seem to 
store a flag in the type variable.
                int headerSize = extendedWKB ? wkbHeaderSize + 4 : 
wkbHeaderSize;
-               if (type & 0xffffff != 4) // Ignore the most significant byte, 
it might have a flag in.
+               if ((type & 0xffffff) != 4) // Ignore the most significant 
byte, it might have a flag in.
                {
                        PLAYER_WARN1(
                                        "InterpretLayerData only supports 
MultiPoint data %d\n",

Modified: code/player/trunk/server/server.cc
===================================================================
--- code/player/trunk/server/server.cc  2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/server.cc  2008-12-19 02:50:12 UTC (rev 7210)
@@ -347,6 +347,12 @@
 {
   delete ptcp;
   delete pudp;
+
+  if(deviceTable->StopAlwaysonDrivers() != 0)
+  {
+    PLAYER_ERROR("failed to stop alwayson drivers");
+  }
+
   player_globals_fini();
   delete cf;
 }


This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.

------------------------------------------------------------------------------
SF.Net email is Sponsored by MIX09, March 18-20, 2009 in Las Vegas, Nevada.
The future of the web can't happen without you.  Join us at MIX09 to help
pave the way to the Next Web now. Learn more and register at
http://ad.doubleclick.net/clk;208669438;13503038;i?http://2009.visitmix.com/
_______________________________________________
Playerstage-commit mailing list
Playerstage-commit@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/playerstage-commit

Reply via email to