Module: sems Branch: master Commit: 831a64dac4e0d934971b6c2a0c51714308c26fdb URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sems/?a=commit;h=831a64dac4e0d934971b6c2a0c51714308c26fdb
Author: Stefan Sayer <[email protected]> Committer: Raphael Coeffic <[email protected]> Date: Sun Jan 22 00:10:40 2012 +0100 b/f: fix MT RTP receiver with RTP relay - reversed the introduction of the rtp receiver thread index. - simplest hashing (modulo) on socket descriptor seems to be sufficient. --- core/AmB2BSession.cpp | 18 +++++++----------- core/AmB2BSession.h | 8 ++------ core/AmRtpReceiver.cpp | 11 +++++------ core/AmRtpReceiver.h | 5 ++--- core/AmRtpStream.cpp | 6 +++--- core/AmRtpStream.h | 6 ------ 6 files changed, 19 insertions(+), 35 deletions(-) diff --git a/core/AmB2BSession.cpp b/core/AmB2BSession.cpp index df94b52..7d8f3a6 100644 --- a/core/AmB2BSession.cpp +++ b/core/AmB2BSession.cpp @@ -46,7 +46,7 @@ AmB2BSession::AmB2BSession() relay_rtp_streams(NULL), relay_rtp_streams_cnt(0), est_invite_cseq(0),est_invite_other_cseq(0) { - memset(other_streams,0,sizeof(OtherStreamInfo)*MAX_RELAY_STREAMS); + memset(other_stream_fds,0,sizeof(int)*MAX_RELAY_STREAMS); } AmB2BSession::AmB2BSession(const string& other_local_tag) @@ -57,7 +57,7 @@ AmB2BSession::AmB2BSession(const string& other_local_tag) relay_rtp_streams(NULL), relay_rtp_streams_cnt(0), est_invite_cseq(0),est_invite_other_cseq(0) { - memset(other_streams,0,sizeof(OtherStreamInfo)*MAX_RELAY_STREAMS); + memset(other_stream_fds,0,sizeof(int)*MAX_RELAY_STREAMS); } AmB2BSession::~AmB2BSession() @@ -869,9 +869,7 @@ void AmB2BSession::setupRelayStreams(AmB2BSession* other_session) { // link the other streams as our relay streams for (unsigned int i=0; i<relay_rtp_streams_cnt; i++) { other_session->relay_rtp_streams[i]->setRelayStream(relay_rtp_streams[i]); - other_streams[i].fd = other_session->relay_rtp_streams[i]->getLocalSocket(); - other_streams[i].recver_index = - other_session->relay_rtp_streams[i]->getReceiverIndex(); + other_stream_fds[i] = other_session->relay_rtp_streams[i]->getLocalSocket(); relay_rtp_streams[i]->setLocalIP(localRTPIP()); relay_rtp_streams[i]->enableRtpRelay(); } @@ -880,15 +878,13 @@ void AmB2BSession::setupRelayStreams(AmB2BSession* other_session) { void AmB2BSession::clearRtpReceiverRelay() { for (unsigned int i=0; i<relay_rtp_streams_cnt; i++) { // clear the other call's RTP relay streams from RTP receiver - if (other_streams[i].fd) { - AmRtpReceiver::instance()->removeStream(other_streams[i].fd, - other_streams[i].recver_index); - memset(&(other_streams[i]),0,sizeof(OtherStreamInfo)); + if (other_stream_fds[i]) { + AmRtpReceiver::instance()->removeStream(other_stream_fds[i]); + other_stream_fds[i] = 0; } // clear our relay streams from RTP receiver if (relay_rtp_streams[i]->hasLocalSocket()) { - AmRtpReceiver::instance()->removeStream(relay_rtp_streams[i]->getLocalSocket(), - relay_rtp_streams[i]->getReceiverIndex()); + AmRtpReceiver::instance()->removeStream(relay_rtp_streams[i]->getLocalSocket()); } } } diff --git a/core/AmB2BSession.h b/core/AmB2BSession.h index c6ef81e..6c5fbae 100644 --- a/core/AmB2BSession.h +++ b/core/AmB2BSession.h @@ -240,13 +240,9 @@ class AmB2BSession: public AmSession /** number of relay RTP streams */ unsigned int relay_rtp_streams_cnt; - struct OtherStreamInfo { - int fd; - unsigned int recver_index; - }; - /** fd of the other streams' sockets (to remove from + /** fds of the other streams' sockets (to remove from RtpReceiver at end of relaying) */ - OtherStreamInfo other_streams[MAX_RELAY_STREAMS]; + int other_stream_fds[MAX_RELAY_STREAMS]; /** clear our and the other side's RTP streams from RTPReceiver */ void clearRtpReceiverRelay(); diff --git a/core/AmRtpReceiver.cpp b/core/AmRtpReceiver.cpp index d249bdb..2ebaf1a 100644 --- a/core/AmRtpReceiver.cpp +++ b/core/AmRtpReceiver.cpp @@ -195,15 +195,14 @@ void _AmRtpReceiver::start() receivers[i].start(); } -unsigned int _AmRtpReceiver::addStream(int sd, AmRtpStream* stream) +void _AmRtpReceiver::addStream(int sd, AmRtpStream* stream) { - unsigned int i = next_index.inc() % n_receivers; + unsigned int i = sd % n_receivers; receivers[i].addStream(sd,stream); - return i; } -void _AmRtpReceiver::removeStream(int sd, unsigned int thread_index) +void _AmRtpReceiver::removeStream(int sd) { - assert(thread_index < n_receivers); - receivers[thread_index].removeStream(sd); + unsigned int i = sd % n_receivers; + receivers[i].removeStream(sd); } diff --git a/core/AmRtpReceiver.h b/core/AmRtpReceiver.h index fe5eeeb..6c1d87b 100644 --- a/core/AmRtpReceiver.h +++ b/core/AmRtpReceiver.h @@ -102,9 +102,8 @@ public: void start(); - // returns the receiver thread index - unsigned int addStream(int sd, AmRtpStream* stream); - void removeStream(int sd, unsigned int thread_index); + void addStream(int sd, AmRtpStream* stream); + void removeStream(int sd); }; typedef singleton<_AmRtpReceiver> AmRtpReceiver; diff --git a/core/AmRtpStream.cpp b/core/AmRtpStream.cpp index 60d7066..22e85b9 100644 --- a/core/AmRtpStream.cpp +++ b/core/AmRtpStream.cpp @@ -158,8 +158,8 @@ void AmRtpStream::setLocalPort() } l_port = port; - recver_index = AmRtpReceiver::instance()->addStream(l_sd,this); - DBG("added to RTP receiver #%i (%s:%i)\n", recver_index, + AmRtpReceiver::instance()->addStream(l_sd, this); + DBG("added stream [%p] to RTP receiver (%s:%i)\n", this, get_addr_str(l_saddr.sin_addr).c_str(),l_port); } @@ -376,7 +376,7 @@ AmRtpStream::~AmRtpStream() { if(l_sd){ if (AmRtpReceiver::haveInstance()) - AmRtpReceiver::instance()->removeStream(l_sd,recver_index); + AmRtpReceiver::instance()->removeStream(l_sd); close(l_sd); } } diff --git a/core/AmRtpStream.h b/core/AmRtpStream.h index 71702ae..4c7950d 100644 --- a/core/AmRtpStream.h +++ b/core/AmRtpStream.h @@ -171,9 +171,6 @@ protected: /** Local socket */ int l_sd; - /** Receiver thread index */ - unsigned int recver_index; - /** Timestamp of the last received RTP packet */ struct timeval last_recv_time; @@ -279,9 +276,6 @@ public: /** initializes and gets the socket descriptor for local socket */ int getLocalSocket(); - /** returns the receiver thread index */ - unsigned int getReceiverIndex() { return recver_index; } - /** * This function must be called before setLocalPort, because * setLocalPort will bind the socket and it will be not _______________________________________________ Semsdev mailing list [email protected] http://lists.iptel.org/mailman/listinfo/semsdev
