Revision: 7210
http://playerstage.svn.sourceforge.net/playerstage/?rev=7210&view=rev
Author: thjc
Date: 2008-12-19 02:50:12 +0000 (Fri, 19 Dec 2008)
Log Message:
-----------
Fixes for:
Player server shutdown logic
Reduced locking time for Filewatcher so other threads get a chance to add
watches
Unsubscription from localbb was removing random subscriptions
Modified Paths:
--------------
code/player/trunk/libplayercore/devicetable.cc
code/player/trunk/libplayercore/devicetable.h
code/player/trunk/libplayercore/driver.h
code/player/trunk/libplayercore/error.h
code/player/trunk/libplayercore/filewatcher.cc
code/player/trunk/libplayercore/globals.cc
code/player/trunk/libplayercore/message.cc
code/player/trunk/libplayercore/threaded_driver.cc
code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
code/player/trunk/server/drivers/position/nav200/sicknav200.cc
code/player/trunk/server/server.cc
Modified: code/player/trunk/libplayercore/devicetable.cc
===================================================================
--- code/player/trunk/libplayercore/devicetable.cc 2008-12-19 02:36:48 UTC
(rev 7209)
+++ code/player/trunk/libplayercore/devicetable.cc 2008-12-19 02:50:12 UTC
(rev 7210)
@@ -272,6 +272,29 @@
return(0);
}
+int
+DeviceTable::StopAlwaysonDrivers()
+{
+ Device* thisentry;
+
+ // We don't lock here, on the assumption that the caller is also the only
+ // thread that can make changes to the device table.
+ for(thisentry=head;thisentry;thisentry=thisentry->next)
+ {
+ if(thisentry->driver->alwayson)
+ {
+ QueuePointer Temp = QueuePointer();
+ if(thisentry->Unsubscribe(Temp) != 0)
+ {
+ PLAYER_ERROR2("alwayson unsubscription failed for device %s:%d",
+ interf_to_str(thisentry->addr.interf),
thisentry->addr.index);
+ return(-1);
+ }
+ }
+ }
+ return(0);
+}
+
// Register a factory creation function. It will be called when
// GetDevice fails to find a device in the deviceTable. This function
// might, for example, locate the device on a remote host (in a
Modified: code/player/trunk/libplayercore/devicetable.h
===================================================================
--- code/player/trunk/libplayercore/devicetable.h 2008-12-19 02:36:48 UTC
(rev 7209)
+++ code/player/trunk/libplayercore/devicetable.h 2008-12-19 02:50:12 UTC
(rev 7210)
@@ -109,6 +109,15 @@
// than just driver-specific.
int StartAlwaysonDrivers();
+ // Unsubscribe to each device whose driver is marked 'alwayson'. Returns
+ // 0 on success, -1 on error (at least one driver failed to start).
+ //
+ // Only used for coordinating server shutdown
+ //
+ // TODO: change the semantics of alwayson to be device-specific, rather
+ // than just driver-specific.
+ int StopAlwaysonDrivers();
+
// Register a factory creation function. It will be called when
// GetDevice fails to find a device in the deviceTable. This function
// might, for example, locate the device on a remote host (in a
Modified: code/player/trunk/libplayercore/driver.h
===================================================================
--- code/player/trunk/libplayercore/driver.h 2008-12-19 02:36:48 UTC (rev
7209)
+++ code/player/trunk/libplayercore/driver.h 2008-12-19 02:50:12 UTC (rev
7210)
@@ -577,6 +577,14 @@
@returns Returns 0 on success. */
virtual int Shutdown();
+ /** @brief Terminate the driver.
+
+ This method doesnt go through the niceities of unsubscribing etc, only use
+ when the server is actually shutting down as it takes a few shortcuts
+
+ @returns Returns 0 on success. */
+ virtual int Terminate();
+
/** @brief Main method for driver thread.
drivers have their own thread of execution, created using
Modified: code/player/trunk/libplayercore/error.h
===================================================================
--- code/player/trunk/libplayercore/error.h 2008-12-19 02:36:48 UTC (rev
7209)
+++ code/player/trunk/libplayercore/error.h 2008-12-19 02:50:12 UTC (rev
7210)
@@ -66,6 +66,7 @@
#define PLAYER_ERROR2(msg, a, b) ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__,
__LINE__, "error : " msg "\n", a, b)
#define PLAYER_ERROR3(msg, a, b, c) ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__,
__LINE__, "error : " msg "\n", a, b, c)
#define PLAYER_ERROR4(msg, a, b, c,d) ErrorPrint(PLAYER_ERR_ERR, 0, __FILE__,
__LINE__, "error : " msg "\n", a, b, c, d)
+#define PLAYER_ERROR5(msg, a, b, c, d, e) ErrorPrint(PLAYER_ERR_ERR, 0,
__FILE__, __LINE__, "error : " msg "\n", a, b, c, d, e)
/// Warning message macros
#define PLAYER_WARN(msg) ErrorPrint(PLAYER_ERR_WARN, 0, __FILE__,
__LINE__, "warning : " msg "\n")
Modified: code/player/trunk/libplayercore/filewatcher.cc
===================================================================
--- code/player/trunk/libplayercore/filewatcher.cc 2008-12-19 02:36:48 UTC
(rev 7209)
+++ code/player/trunk/libplayercore/filewatcher.cc 2008-12-19 02:50:12 UTC
(rev 7210)
@@ -77,20 +77,25 @@
t.tv_sec = static_cast<int> (floor(Timeout));
t.tv_usec = static_cast<int> ((Timeout - static_cast<int>
(floor(Timeout))) * 1e6);
+ // unlock for the select call
+ // in the worst case if the list gets modified during the call we will
either
+ // not be able to match an event on a deleted fd, or will get spurious
wake ups
+ // on a newly added fd all of which are non fatal
+ Unlock();
int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t);
if (ret < 0)
{
PLAYER_ERROR2("Select called failed in File Watcher: %d
%s",errno,strerror(errno));
- Unlock();
return ret;
}
else if (ret == 0)
{
- Unlock();
return 0;
}
+ Lock();
+
int queueless_count = 0;
int match_count = 0;
Modified: code/player/trunk/libplayercore/globals.cc
===================================================================
--- code/player/trunk/libplayercore/globals.cc 2008-12-19 02:36:48 UTC (rev
7209)
+++ code/player/trunk/libplayercore/globals.cc 2008-12-19 02:50:12 UTC (rev
7210)
@@ -74,7 +74,7 @@
bool player_quit;
bool player_quiet_startup;
-// global access to the cmdlihe arguments
+// global access to the cmdline arguments
int player_argc;
char** player_argv;
Modified: code/player/trunk/libplayercore/message.cc
===================================================================
--- code/player/trunk/libplayercore/message.cc 2008-12-19 02:36:48 UTC (rev
7209)
+++ code/player/trunk/libplayercore/message.cc 2008-12-19 02:50:12 UTC (rev
7210)
@@ -132,7 +132,8 @@
}
else
{
- PLAYER_ERROR3 ("failed to find clone function for message %s: %s, %d",
interf_to_str (Header.addr.interf), msgtype_to_str (Header.type),
Header.subtype);
+ this->Data = NULL;
+ PLAYER_ERROR5 ("failed to find clone function for message %s[%d]:
%s[%d], %d", interf_to_str (Header.addr.interf),Header.addr.interf,
msgtype_to_str (Header.type), Header.type, Header.subtype);
}
}
else
@@ -529,8 +530,8 @@
// If it was a response, then mark it , to prompt
// processing of the queue.
if(!this->data_requested &&
- (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
- (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
+ (hdr->type == PLAYER_MSGTYPE_RESP_ACK ||
+ hdr->type == PLAYER_MSGTYPE_RESP_NACK ))
this->SetDataRequested(true,true);
this->Unlock();
Modified: code/player/trunk/libplayercore/threaded_driver.cc
===================================================================
--- code/player/trunk/libplayercore/threaded_driver.cc 2008-12-19 02:36:48 UTC
(rev 7209)
+++ code/player/trunk/libplayercore/threaded_driver.cc 2008-12-19 02:50:12 UTC
(rev 7210)
@@ -89,6 +89,17 @@
// destructor, to free up allocated queue.
ThreadedDriver::~ThreadedDriver()
{
+ // if our thread is still running request it to be stopped
+ if (ThreadState == PLAYER_THREAD_STATE_RUNNING || ThreadState ==
PLAYER_THREAD_STATE_RESTARTING)
+ {
+ StopThread();
+ }
+ // wait for the thread to actually stop
+ while (ThreadState != PLAYER_THREAD_STATE_STOPPED)
+ {
+ usleep(100000);
+ }
+
pthread_barrier_destroy(&threadSetupBarrier);
}
@@ -129,6 +140,7 @@
{
if (ThreadState == PLAYER_THREAD_STATE_RUNNING)
{
+ PLAYER_MSG2(5,"Cancelling thread %p belonging to driver
%p",driverthread,this);
pthread_cancel(driverthread);
if(pthread_detach(driverthread))
perror("ThreadedDriver::StopThread:pthread_detach()");
@@ -224,6 +236,16 @@
return 0;
}
+int ThreadedDriver::Terminate()
+{
+ int ret = Driver::Terminate();
+ // wait for the thread to actually stop
+ while (ret == 0 && ThreadState != PLAYER_THREAD_STATE_STOPPED)
+ {
+ usleep(100000);
+ }
+ return ret;
+}
bool ThreadedDriver::Wait(double TimeOut)
{
Modified: code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
===================================================================
--- code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
2008-12-19 02:50:12 UTC (rev 7210)
@@ -185,7 +185,7 @@
/** @brief Driver de-initialisation. */
int Shutdown();
/** Override the unsubscribe to stop sending events to devices
which are no longer subscribed. */
- int Unsubscribe(player_devaddr_t addr);
+ int Unsubscribe(QueuePointer &queue, player_devaddr_t addr);
// MessageHandler
/** @brief Process incoming messages.
@@ -307,15 +307,11 @@
* map<group, vector<device queue> >
*/
map<std::string, vector<QueuePointer> > group_listeners;
-
- /** Map of queues to devices. Used to remove unneeded queues
when a device is unsubscribed. */
- map<player_devaddr_t, QueuePointer> subscribed_devices;
};
////////////////////////////////////////////////////////////////////////////////
// Override the unsubscribe. Stop sending out events to unsubscribed devices.
-int LocalBB::Unsubscribe(player_devaddr_t addr)
+int LocalBB::Unsubscribe(QueuePointer &qp, player_devaddr_t addr)
{
- QueuePointer &qp = subscribed_devices[addr];
for (map<std::string, map<std::string, vector<QueuePointer> >
>::iterator itr = listeners.begin(); itr != listeners.end(); itr++)
{
map<std::string, vector<QueuePointer> > &keys = (*itr).second;
@@ -354,7 +350,7 @@
}
}
- return Driver::Unsubscribe(addr);
+ return Driver::Unsubscribe(qp,addr);
}
////////////////////////////////////////////////////////////////////////////////
// Factory method.
@@ -676,7 +672,6 @@
BlackBoardEntry LocalBB::SubscribeKey(const std::string &key, const
std::string &group, const QueuePointer &resp_queue, const player_devaddr_t addr)
{
listeners[group][key].push_back(resp_queue);
- subscribed_devices[addr] = resp_queue;
BlackBoardEntry entry = entries[group][key];
if (entry.key == "")
{
@@ -707,7 +702,6 @@
vector<BlackBoardEntry> LocalBB::SubscribeGroup(const std::string &group,
const QueuePointer &qp, const player_devaddr_t addr)
{
group_listeners[group].push_back(qp);
- subscribed_devices[addr] = qp;
vector<BlackBoardEntry> group_entries;
//map<group, map<key, entry> >
Modified: code/player/trunk/server/drivers/position/nav200/sicknav200.cc
===================================================================
--- code/player/trunk/server/drivers/position/nav200/sicknav200.cc
2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/drivers/position/nav200/sicknav200.cc
2008-12-19 02:50:12 UTC (rev 7210)
@@ -968,7 +968,7 @@
uint32_t type = *reinterpret_cast<uint32_t*>(wkb + 1);
bool extendedWKB = type >> 24 == 32; // Extended WKBs seem to
store a flag in the type variable.
int headerSize = extendedWKB ? wkbHeaderSize + 4 :
wkbHeaderSize;
- if (type & 0xffffff != 4) // Ignore the most significant byte,
it might have a flag in.
+ if ((type & 0xffffff) != 4) // Ignore the most significant
byte, it might have a flag in.
{
PLAYER_WARN1(
"InterpretLayerData only supports
MultiPoint data %d\n",
Modified: code/player/trunk/server/server.cc
===================================================================
--- code/player/trunk/server/server.cc 2008-12-19 02:36:48 UTC (rev 7209)
+++ code/player/trunk/server/server.cc 2008-12-19 02:50:12 UTC (rev 7210)
@@ -347,6 +347,12 @@
{
delete ptcp;
delete pudp;
+
+ if(deviceTable->StopAlwaysonDrivers() != 0)
+ {
+ PLAYER_ERROR("failed to stop alwayson drivers");
+ }
+
player_globals_fini();
delete cf;
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
SF.Net email is Sponsored by MIX09, March 18-20, 2009 in Las Vegas, Nevada.
The future of the web can't happen without you. Join us at MIX09 to help
pave the way to the Next Web now. Learn more and register at
http://ad.doubleclick.net/clk;208669438;13503038;i?http://2009.visitmix.com/
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit