Author: sayer
Date: 2008-09-23 21:03:46 +0200 (Tue, 23 Sep 2008)
New Revision: 1087
Modified:
trunk/core/AmEvent.cpp
trunk/core/AmEvent.h
trunk/core/AmEventDispatcher.cpp
trunk/core/AmEventDispatcher.h
trunk/core/AmEventQueue.cpp
trunk/core/AmEventQueue.h
trunk/core/AmMediaProcessor.cpp
trunk/core/AmMediaProcessor.h
trunk/core/AmRtpReceiver.cpp
trunk/core/AmRtpReceiver.h
trunk/core/AmRtpStream.cpp
trunk/core/AmServer.cpp
trunk/core/AmServer.h
trunk/core/AmSession.cpp
trunk/core/AmSession.h
trunk/core/AmSessionContainer.cpp
trunk/core/AmSessionContainer.h
trunk/core/sems.cpp
Log:
first steps on proper shutdown:
- active sessions and other event receivers
get a SystemEvent::ServerShutdown,
default behaviour of AmSession is setStopped()
- session container waits for all sessions to be ended
- signaling server, rtp receiver, media processor, event dispatcher
are stopped and deleted
based on a patch by Rui Jin Zheng rjzheng at boronetworks dot com
Modified: trunk/core/AmEvent.cpp
===================================================================
--- trunk/core/AmEvent.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEvent.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -5,7 +5,16 @@
{
}
+AmEvent::AmEvent(const AmEvent& rhs)
+: event_id(rhs.event_id), processed(rhs.processed)
+{
+}
+
AmEvent::~AmEvent()
{
}
+AmEvent* AmEvent::clone() {
+ return new AmEvent(*this);
+}
+
Modified: trunk/core/AmEvent.h
===================================================================
--- trunk/core/AmEvent.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEvent.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -34,6 +34,7 @@
using std::string;
#define E_PLUGIN 100
+#define E_SYSTEM 101
/** \brief base event class */
struct AmEvent
@@ -42,7 +43,11 @@
bool processed;
AmEvent(int event_id);
+ AmEvent(const AmEvent& rhs);
+
virtual ~AmEvent();
+
+ virtual AmEvent* clone();
};
/**
@@ -62,7 +67,26 @@
: AmEvent(E_PLUGIN), name(n), data(d) {}
};
+/**
+ * \brief named event for system events (e.g. server stopped)
+ */
+struct AmSystemEvent : public AmEvent
+{
+ enum EvType {
+ ServerShutdown = 0
+ };
+ EvType sys_event;
+
+ AmSystemEvent(EvType e)
+ : AmEvent(E_SYSTEM), sys_event(e) { }
+
+ AmSystemEvent(const AmSystemEvent& rhs)
+ : AmEvent(rhs), sys_event(rhs.sys_event) { }
+
+ AmEvent* clone() { return new AmSystemEvent(*this); };
+};
+
/** \brief event handler interface */
class AmEventHandler
{
@@ -71,4 +95,10 @@
virtual ~AmEventHandler() { };
};
+/* class AmEventFactory */
+/* { */
+/* virtual AmEvent* generateEvent(const string& receiver_id) = 0; */
+/* virtual ~AmEventFactory() { } */
+/* }; */
+
#endif
Modified: trunk/core/AmEventDispatcher.cpp
===================================================================
--- trunk/core/AmEventDispatcher.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEventDispatcher.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -107,6 +107,7 @@
return posted;
}
+
bool AmEventDispatcher::post(const string& callid, const string& remote_tag,
AmEvent* ev)
{
bool posted = false;
@@ -126,6 +127,46 @@
return posted;
}
+bool AmEventDispatcher::broadcast(AmEvent* ev)
+{
+ if (!ev)
+ return false;
+
+ bool posted = false;
+ m_queues.lock();
+
+ for (EvQueueMapIter it = queues.begin();
+ it != queues.end(); it++) {
+ it->second->postEvent(ev->clone());
+ posted = true;
+ }
+
+ m_queues.unlock();
+
+ delete ev;
+
+ return posted;
+}
+
+bool AmEventDispatcher::empty() {
+ bool res = false;
+
+ m_queues.lock();
+ res = queues.empty();
+ m_queues.unlock();
+
+ return res;
+}
+
+void AmEventDispatcher::dispose()
+{
+ if(_instance != NULL) {
+ // todo: add locking here
+ delete _instance;
+ _instance = NULL;
+ }
+}
+
bool AmEventDispatcher::postSipRequest(const string& callid, const string&
remote_tag,
const AmSipRequest& req)
{
Modified: trunk/core/AmEventDispatcher.h
===================================================================
--- trunk/core/AmEventDispatcher.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEventDispatcher.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -65,6 +65,7 @@
public:
static AmEventDispatcher* instance();
+ static void dispose();
bool postSipRequest(const string& callid, const string& remote_tag,
const AmSipRequest& req);
@@ -72,6 +73,9 @@
bool post(const string& local_tag, AmEvent* ev);
bool post(const string& callid, const string& remote_tag, AmEvent* ev);
+ /* send event to all event queues. Note: event instances will be cloned */
+ bool broadcast(AmEvent* ev);
+
bool addEventQueue(const string& local_tag,
AmEventQueueInterface* q,
const string& callid="",
@@ -80,6 +84,8 @@
AmEventQueueInterface* delEventQueue(const string& local_tag,
const string& callid="",
const string& remote_tag="");
+
+ bool empty();
};
#endif
Modified: trunk/core/AmEventQueue.cpp
===================================================================
--- trunk/core/AmEventQueue.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEventQueue.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -82,6 +82,11 @@
ev_pending.wait_for();
}
+void AmEventQueue::wakeup()
+{
+ ev_pending.set(true);
+}
+
void AmEventQueue::processSingleEvent()
{
m_queue.lock();
Modified: trunk/core/AmEventQueue.h
===================================================================
--- trunk/core/AmEventQueue.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmEventQueue.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -63,6 +63,7 @@
void postEvent(AmEvent*);
void processEvents();
void waitForEvent();
+ void wakeup();
void processSingleEvent();
};
Modified: trunk/core/AmMediaProcessor.cpp
===================================================================
--- trunk/core/AmMediaProcessor.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmMediaProcessor.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -50,14 +50,16 @@
/* session scheduler */
-AmMediaProcessor* AmMediaProcessor::_instance;
+AmMediaProcessor* AmMediaProcessor::_instance = NULL;
AmMediaProcessor::AmMediaProcessor()
+ : threads(NULL),num_threads(0)
{
}
AmMediaProcessor::~AmMediaProcessor()
{
+ INFO("Media processor has been recycled.\n");
}
void AmMediaProcessor::init() {
@@ -167,10 +169,49 @@
threads[sched_thread]->postRequest(new SchedRequest(r_type,s));
}
-/* the actual session scheduler thread */
+void AmMediaProcessor::stop() {
+ assert(threads);
+ for (unsigned int i=0;i<num_threads;i++) {
+ if(threads[i] != NULL) {
+ threads[i]->stop();
+ }
+ }
+ bool threads_stopped = true;
+ do {
+ usleep(10000);
+ for (unsigned int i=0;i<num_threads;i++) {
+ if((threads[i] != NULL) &&(!threads[i]->is_stopped())) {
+ threads_stopped = false;
+ break;
+ }
+ }
+ } while(!threads_stopped);
+
+ for (unsigned int i=0;i<num_threads;i++) {
+ if(threads[i] != NULL) {
+ delete threads[i];
+ threads[i] = NULL;
+ }
+ }
+ delete [] threads;
+ threads = NULL;
+}
+void AmMediaProcessor::dispose()
+{
+ if(_instance != NULL) {
+ if(_instance->threads != NULL) {
+ _instance->stop();
+ }
+ delete _instance;
+ _instance = NULL;
+ }
+}
+
+/* the actual media processing thread */
+
AmMediaProcessorThread::AmMediaProcessorThread()
- : events(this)
+ : events(this), stop_requested(false)
{
}
AmMediaProcessorThread::~AmMediaProcessorThread()
@@ -179,10 +220,13 @@
void AmMediaProcessorThread::on_stop()
{
+ INFO("requesting media processor to stop.\n");
+ stop_requested.set(true);
}
void AmMediaProcessorThread::run()
{
+ stop_requested = false;
struct timeval now,next_tick,diff,tick;
// wallclock time
unsigned int ts = 0;
@@ -193,7 +237,7 @@
gettimeofday(&now,NULL);
timeradd(&tick,&now,&next_tick);
- while(true){
+ while(!stop_requested.get()){
gettimeofday(&now,NULL);
Modified: trunk/core/AmMediaProcessor.h
===================================================================
--- trunk/core/AmMediaProcessor.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmMediaProcessor.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -62,6 +62,7 @@
// AmThread interface
void run();
void on_stop();
+ AmSharedVar<bool> stop_requested;
// AmEventHandler interface
void process(AmEvent* e);
@@ -120,6 +121,8 @@
void changeCallgroup(AmSession* s,
const string& new_callgroup);
+ void stop();
+ static void dispose();
};
Modified: trunk/core/AmRtpReceiver.cpp
===================================================================
--- trunk/core/AmRtpReceiver.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmRtpReceiver.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -57,7 +57,12 @@
return _instance;
}
+bool AmRtpReceiver::haveInstance() {
+ return NULL != _instance;
+}
+
AmRtpReceiver::AmRtpReceiver()
+ : stop_requested(false)
{
fds = new struct pollfd[MAX_RTP_SESSIONS];
nfds = 0;
@@ -66,18 +71,36 @@
AmRtpReceiver::~AmRtpReceiver()
{
delete [] (fds);
+ INFO("RTP receiver has been recycled.\n");
}
void AmRtpReceiver::on_stop()
{
+ INFO("requesting RTP receiver to stop.\n");
+ stop_requested.set(true);
}
+void AmRtpReceiver::dispose()
+{
+ if(_instance != NULL) {
+ if(!_instance->is_stopped()) {
+ _instance->stop();
+
+ while(!_instance->is_stopped())
+ usleep(10000);
+ }
+ // todo: add locking here
+ delete _instance;
+ _instance = NULL;
+ }
+}
+
void AmRtpReceiver::run()
{
unsigned int tmp_nfds = 0;
struct pollfd* tmp_fds = new struct pollfd[MAX_RTP_SESSIONS];
- while(true){
+ while(!stop_requested.get()){
fds_mut.lock();
tmp_nfds = nfds;
@@ -126,6 +149,8 @@
streams_mut.unlock();
}
}
+
+ delete[] (tmp_fds);
}
void AmRtpReceiver::addStream(int sd, AmRtpStream* stream)
Modified: trunk/core/AmRtpReceiver.h
===================================================================
--- trunk/core/AmRtpReceiver.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmRtpReceiver.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -49,7 +49,6 @@
typedef std::map<int, AmRtpStream*, greater<int> > Streams;
static AmRtpReceiver* _instance;
-
Streams streams;
AmMutex streams_mut;
@@ -64,11 +63,15 @@
void run();
void on_stop();
-
+ AmSharedVar<bool> stop_requested;
+
public:
static AmRtpReceiver* instance();
+ static bool haveInstance();
void addStream(int sd, AmRtpStream* stream);
void removeStream(int sd);
+
+ static void dispose();
};
#endif
Modified: trunk/core/AmRtpStream.cpp
===================================================================
--- trunk/core/AmRtpStream.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmRtpStream.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -379,7 +379,8 @@
AmRtpStream::~AmRtpStream()
{
if(l_sd){
- AmRtpReceiver::instance()->removeStream(l_sd);
+ if (AmRtpReceiver::haveInstance())
+ AmRtpReceiver::instance()->removeStream(l_sd);
close(l_sd);
}
}
Modified: trunk/core/AmServer.cpp
===================================================================
--- trunk/core/AmServer.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmServer.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -42,6 +42,22 @@
return _instance ? _instance : ((_instance = new AmServer()));
}
+void AmServer::dispose()
+{
+ if(_instance != NULL) {
+ if(_instance->ctrlIface != NULL) {
+ _instance->ctrlIface->join();
+ }
+ delete _instance;
+ _instance = NULL;
+ }
+}
+
+AmServer::~AmServer()
+{
+ INFO("Signaling Server has been recycled.\n");
+}
+
void AmServer::run()
{
ctrlIface->start();
Modified: trunk/core/AmServer.h
===================================================================
--- trunk/core/AmServer.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmServer.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -50,11 +50,12 @@
static AmCtrlInterface *ctrlIface;
/** Avoid external instantiation. @see instance(). */
- ~AmServer(){}
+ ~AmServer();
public:
/** Get a fifo server instance. */
static AmServer* instance();
+ static void dispose();
/** Runs the fifo server. */
void run();
@@ -69,8 +70,8 @@
static bool sendRequest(const AmSipRequest &, char* serKey, unsigned int&
serKeyLen);
static bool sendReply(const AmSipReply &);
static string getContact(const string &displayName,
- const string &userName, const string &hostName,
- const string &uriParams, const string &hdrParams);
+ const string &userName, const string &hostName,
+ const string &uriParams, const string &hdrParams);
};
#endif
Modified: trunk/core/AmSession.cpp
===================================================================
--- trunk/core/AmSession.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmSession.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -371,8 +371,9 @@
session_num--;
// wait at least until session is out of RtpScheduler
+ //detached.wait_for();
+
DBG("session is stopped.\n");
- //detached.wait_for();
}
void AmSession::on_stop()
@@ -484,6 +485,15 @@
DBG("AmSession::process\n");
+ if (ev->event_id == E_SYSTEM) {
+ AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
+ if(sys_ev){
+ DBG("Session received system Event\n");
+ onSystemEvent(sys_ev);
+ return;
+ }
+ }
+
AmSipEvent* sip_ev = dynamic_cast<AmSipEvent*>(ev);
if(sip_ev){
DBG("Session received SIP Event\n");
@@ -752,6 +762,13 @@
return -1;
}
+void AmSession::onSystemEvent(AmSystemEvent* ev) {
+ if (ev->sys_event == AmSystemEvent::ServerShutdown) {
+ setStopped();
+ return;
+ }
+}
+
void AmSession::onSendRequest(const string& method, const string& content_type,
const string& body, string& hdrs, int flags,
unsigned int cseq)
{
Modified: trunk/core/AmSession.h
===================================================================
--- trunk/core/AmSession.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmSession.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -419,6 +419,10 @@
virtual void onSipReply(const AmSipReply& reply);
+ /**
+ * entry point for system events
+ */
+ virtual void onSystemEvent(AmSystemEvent* ev);
#ifdef WITH_ZRTP
/**
Modified: trunk/core/AmSessionContainer.cpp
===================================================================
--- trunk/core/AmSessionContainer.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmSessionContainer.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -41,80 +41,118 @@
// AmSessionContainer methods
-AmSessionContainer* AmSessionContainer::_SessionContainer=0;
+AmSessionContainer* AmSessionContainer::_instance=NULL;
AmSessionContainer::AmSessionContainer()
- : _run_cond(false)
+ : _run_cond(false), _container_closed(false)
{
}
AmSessionContainer* AmSessionContainer::instance()
{
- if(!_SessionContainer)
- _SessionContainer = new AmSessionContainer();
+ if(!_instance)
+ _instance = new AmSessionContainer();
- return _SessionContainer;
+ return _instance;
}
-void AmSessionContainer::on_stop()
-{
+void AmSessionContainer::dispose()
+{
+ if(_instance != NULL) {
+ if(!_instance->is_stopped()) {
+ _instance->stop();
+
+ while(!_instance->is_stopped())
+ usleep(10000);
+ }
+ // todo: add locking here
+ delete _instance;
+ _instance = NULL;
+ }
}
+bool AmSessionContainer::clean_sessions() {
+ ds_mut.lock();
+ DBG("Session cleaner starting its work\n");
+
+ try {
+ SessionQueue n_sessions;
+
+ while(!d_sessions.empty()){
+
+ AmSession* cur_session = d_sessions.front();
+ d_sessions.pop();
+
+ ds_mut.unlock();
+
+ if(cur_session->is_stopped() && cur_session->detached.get()){
+
+ DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
+ delete cur_session;
+ }
+ else {
+ DBG("session %p still running\n",(void*)cur_session->_pid);
+ n_sessions.push(cur_session);
+ }
+
+ ds_mut.lock();
+ }
+
+ swap(d_sessions,n_sessions);
+
+ }catch(std::exception& e){
+ ERROR("exception caught in session cleaner: %s\n", e.what());
+ throw; /* throw again as this is fatal (because unlocking the mutex
fails!! */
+ }catch(...){
+ ERROR("unknown exception caught in session cleaner!\n");
+ throw; /* throw again as this is fatal (because unlocking the mutex
fails!! */
+ }
+ bool more = !d_sessions.empty();
+ ds_mut.unlock();
+ return more;
+}
+
void AmSessionContainer::run()
{
- while(1){
+ while(!_container_closed.get()){
_run_cond.wait_for();
- // Let some time for the Sessions
- // to stop by themselves
+ if(_container_closed.get())
+ break;
+
+ // Give the Sessions some time to stop by themselves
sleep(5);
- ds_mut.lock();
- DBG("Session cleaner starting its work\n");
-
- try {
- SessionQueue n_sessions;
+ bool more = clean_sessions();
- while(!d_sessions.empty()){
+ DBG("Session cleaner finished\n");
+ if(!more && (!_container_closed.get()))
+ _run_cond.set(false);
+ }
+ DBG("Session cleaner terminating\n");
+}
- AmSession* cur_session = d_sessions.front();
- d_sessions.pop();
+void AmSessionContainer::on_stop()
+{
+ _container_closed.set(true);
- ds_mut.unlock();
+ DBG("brodcasting ServerShutdown system event to sessions...\n");
+ AmEventDispatcher::instance()->
+ broadcast(new AmSystemEvent(AmSystemEvent::ServerShutdown));
+
+ DBG("waiting for active event queues to stop...\n");
- if(cur_session->is_stopped() && cur_session->detached.get()){
-
- DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
- delete cur_session;
- }
- else {
- DBG("session %p still running\n",(void*)cur_session->_pid);
- n_sessions.push(cur_session);
- }
+ while (!AmEventDispatcher::instance()->empty())
+ sleep(1);
+
+ DBG("cleaning sessions...\n");
+ while (clean_sessions())
+ sleep(1);
- ds_mut.lock();
- }
-
- swap(d_sessions,n_sessions);
-
- }catch(std::exception& e){
- ERROR("exception caught in session cleaner: %s\n", e.what());
- throw; /* throw again as this is fatal (because unlocking the mutex
fails!! */
- }catch(...){
- ERROR("unknown exception caught in session cleaner!\n");
- throw; /* throw again as this is fatal (because unlocking the mutex
fails!! */
- }
-
- bool more = !d_sessions.empty();
- ds_mut.unlock();
-
- DBG("Session cleaner finished\n");
- if(!more)
- _run_cond.set(false);
- }
+ _run_cond.set(true); // so that thread stops
}
void AmSessionContainer::stopAndQueue(AmSession* s)
@@ -314,14 +352,20 @@
const string& local_tag,
AmSession* session)
{
- return AmEventDispatcher::instance()->
- addEventQueue(local_tag,(AmEventQueue*)session,
- callid,remote_tag);
+ if(_container_closed.get())
+ return false;
+
+ return AmEventDispatcher::instance()->
+ addEventQueue(local_tag,(AmEventQueue*)session,
+ callid,remote_tag);
}
bool AmSessionContainer::addSession(const string& local_tag,
AmSession* session)
{
- return AmEventDispatcher::instance()->
- addEventQueue(local_tag,(AmEventQueue*)session);
+ if(_container_closed.get())
+ return false;
+
+ return AmEventDispatcher::instance()->
+ addEventQueue(local_tag,(AmEventQueue*)session);
}
Modified: trunk/core/AmSessionContainer.h
===================================================================
--- trunk/core/AmSessionContainer.h 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/AmSessionContainer.h 2008-09-23 19:03:46 UTC (rev 1087)
@@ -47,7 +47,7 @@
*/
class AmSessionContainer : public AmThread
{
- static AmSessionContainer* _SessionContainer;
+ static AmSessionContainer* _instance;
typedef std::queue<AmSession*> SessionQueue;
@@ -56,6 +56,9 @@
/** Mutex to protect the dead session container */
AmMutex ds_mut;
+ /** is container closed for new sessions? */
+ AmCondition<bool> _container_closed;
+
/** the daemon only runs if this is true */
AmCondition<bool> _run_cond;
@@ -72,9 +75,13 @@
/** @see AmThread::on_stop() */
void on_stop();
+ bool clean_sessions();
+
public:
static AmSessionContainer* instance();
+ static void dispose();
+
/**
* Creates a new session.
* @param req local request
Modified: trunk/core/sems.cpp
===================================================================
--- trunk/core/sems.cpp 2008-09-23 15:06:22 UTC (rev 1086)
+++ trunk/core/sems.cpp 2008-09-23 19:03:46 UTC (rev 1087)
@@ -34,6 +34,7 @@
#include "AmMediaProcessor.h"
#include "AmIcmpWatcher.h"
#include "AmRtpReceiver.h"
+#include "AmEventDispatcher.h"
#include "AmZRTP.h"
@@ -92,7 +93,31 @@
if (!main_pid || (main_pid == getpid())) {
unlink(pid_file.c_str());
+
+ static AmMutex clean_up_mut;
+ static AmCondition<bool> need_clean(true);
+
+ clean_up_mut.lock();
+ if(need_clean.get()) {
+
+ need_clean.set(false);
+
+ AmRtpReceiver::dispose();
+
+ AmSessionContainer::dispose();
+
+ AmServer::dispose();
+
+ AmMediaProcessor::dispose();
+
+ AmEventDispatcher::dispose();
+
+ }
+
+ clean_up_mut.unlock();
+
INFO("Finished.\n");
+
exit(0);
}
_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev