Hello everyone, I have created a TCP/IP block by adapting the ZMQ message pub block. Both blocks make use of boost multithreading. The TCP/IP block is used by a standalone C++ program. To run the gnuradio topblock, the C++ program calls tb->start() function. To stop the topblock, the functions tb->stop() and tb->wait() are called.However, the program "hangs" when tb->stop() is called. This suggests there is something wrong with the way I use boost multithreading. All help is appreciated.
Regards, Moses.
/* -*- c++ -*- */ /* * Copyright 2013 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #ifndef INCLUDED_BLOCKS_TCPSERVER_H #define INCLUDED_BLOCKS_TCPSERVER_H #include <gnuradio/block.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <arpa/inet.h> #include "src/pcm/LineCode.h" namespace icd { namespace net { class TCPServer; typedef boost::shared_ptr<TCPServer> TCPServer_sptr; TCPServer_sptr make_TCPServer(std::string addr, int port, int MTU = 10000); class TCPServer : public gr::block { friend TCPServer_sptr make_TCPServer(std::string addr, int port, int MTU); private: boost::thread* m_ListenThread, *m_ReceiveThread; int m_nPort; std::string m_pzAddr; int m_nSockFd, m_nNewSockFd; struct sockaddr_in m_ServerAddress; struct sockaddr_in m_ClientAddress; void Setup(); void ListenLoop(); void ReceiveLoop(); void TransmitLoop(pmt::pmt_t pmt_msg); bool m_bFinished; int m_nMTU; pcm::LineCode m_LineCode; public: TCPServer(std::string addr, int port, int MTU = 10000); ~TCPServer(); bool start(); bool stop(); }; } /* namespace blocks */ } /* namespace gr */ #endif /* INCLUDED_BLOCKS_TCPServer_H */
/* -*- c++ -*- */ /* * Copyright 2013,2019 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #include "TCPServer.h" #include <gnuradio/io_signature.h> namespace icd { namespace net { TCPServer_sptr make_TCPServer(std::string addr, int port, int MTU) { return gnuradio::get_initial_sptr( new TCPServer(addr, port, MTU)); } TCPServer::TCPServer(std::string addr, int port, int MTU) : block("TCPServer", gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), m_nPort(port),m_pzAddr(addr),m_nMTU(MTU) { Setup(); message_port_register_in(pmt::mp("in")); message_port_register_out(pmt::mp("out")); set_msg_handler( pmt::mp("in"), boost::bind(&TCPServer::TransmitLoop, this, _1) ); //std::cout << "TCP SERVER CONSTRUCTED" << std::endl; } void TCPServer::Setup() { m_nSockFd=socket(AF_INET,SOCK_STREAM,0); memset(&m_ServerAddress,0,sizeof(m_ServerAddress)); m_ServerAddress.sin_family=AF_INET; m_ServerAddress.sin_addr.s_addr=htonl(INADDR_ANY); m_ServerAddress.sin_port=htons(m_nPort); bind(m_nSockFd,(struct sockaddr *)&m_ServerAddress, sizeof(m_ServerAddress)); int reusePort = 1; setsockopt(m_nSockFd, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort)); listen(m_nSockFd,1); } void TCPServer::ListenLoop() { int n = 0; while(!m_bFinished) { socklen_t sosize = sizeof(m_ClientAddress); m_nNewSockFd = accept(m_nSockFd,(struct sockaddr*)&m_ClientAddress,&sosize); // Receving Thread m_ReceiveThread = new boost::thread(boost::bind(&TCPServer::ReceiveLoop, this)); } } void TCPServer::TransmitLoop(pmt::pmt_t pmt_msg) { // Extracting message from pmt pmt::pmt_t msg = pmt::cdr(pmt_msg); size_t offset(0); int msg_len = pmt::length(msg); unsigned char *msg_char = (unsigned char*)malloc(msg_len); memcpy(msg_char,pmt::uniform_vector_elements(msg, offset),msg_len); if(m_nNewSockFd != -1) { if( send(m_nNewSockFd , msg_char , msg_len , 0) < 0) { //std::cout << "Send failed : " << std::endl; //return false; } } } void TCPServer::ReceiveLoop() { int n = 0; while(!m_bFinished) { char msg[m_nMTU]; n=recv(m_nNewSockFd,msg,m_nMTU,0); if(n==0) { close(m_nNewSockFd); break; } m_LineCode.UCharArrayDump(msg,n); std::cout << "NewSockFd : " << m_nNewSockFd << std::endl; std::cout << "Value of n : " << n << std::endl; std::cout << "Port number: " << m_nPort << std::endl; if (n > 0){ std::cout << "Alias : " << this->alias() << std::endl; pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(msg,n))); message_port_pub(pmt::mp("out"), pdu); } } } TCPServer::~TCPServer() { stop(); } bool TCPServer::start() { m_bFinished = false; m_ListenThread = new boost::thread(boost::bind(&TCPServer::ListenLoop, this)); return true; } bool TCPServer::stop() { m_bFinished = true; //m_ReceiveThread->detach(); //m_ReceiveThread->join(); m_ListenThread->detach(); std::cout << "DETACHED" << std::endl; m_ListenThread->join(); std::cout << "JOINED" << std::endl; std::cout << "RECEIVE THREAD STOPPED" << std::endl; return true; } } /* namespace blocks */ } /* namespace gr */