Hello everyone,
I have created a TCP/IP block by adapting the ZMQ message pub block. Both
blocks make use of boost multithreading. The TCP/IP block is used by a
standalone C++ program. To run the gnuradio topblock, the C++ program calls
tb->start() function. To stop the topblock, the functions tb->stop() and
tb->wait() are called.However, the program "hangs" when tb->stop() is
called. This suggests there is something wrong with the way I use boost
multithreading.
All help is appreciated.

Regards,
Moses.
/* -*- c++ -*- */
/*
 * Copyright 2013 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifndef INCLUDED_BLOCKS_TCPSERVER_H
#define INCLUDED_BLOCKS_TCPSERVER_H

#include <gnuradio/block.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include "src/pcm/LineCode.h"

namespace icd {
namespace net {
class TCPServer;
typedef boost::shared_ptr<TCPServer> TCPServer_sptr;
TCPServer_sptr
make_TCPServer(std::string addr,
               int port,
               int MTU = 10000);
class TCPServer : public gr::block
{
    friend
    TCPServer_sptr
    make_TCPServer(std::string addr,
                   int port,
                   int MTU);
private:
    boost::thread* m_ListenThread, *m_ReceiveThread;
    int m_nPort;
    std::string m_pzAddr;
    int m_nSockFd, m_nNewSockFd;
    struct sockaddr_in m_ServerAddress;
    struct sockaddr_in m_ClientAddress;
    void Setup();
    void ListenLoop();
    void ReceiveLoop();
    void TransmitLoop(pmt::pmt_t pmt_msg);
    bool m_bFinished;
    int m_nMTU;
    pcm::LineCode m_LineCode;

public:
    TCPServer(std::string addr,
              int port,
              int MTU = 10000);
    ~TCPServer();
    bool start();
    bool stop();
};

} /* namespace blocks */
} /* namespace gr */

#endif /* INCLUDED_BLOCKS_TCPServer_H */
/* -*- c++ -*- */
/*
 * Copyright 2013,2019 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */


#include "TCPServer.h"
#include <gnuradio/io_signature.h>

namespace icd {
namespace net {

TCPServer_sptr
make_TCPServer(std::string addr,
               int port,
               int MTU)
{
    return gnuradio::get_initial_sptr(
        new TCPServer(addr, port, MTU));
}

TCPServer::TCPServer(std::string addr,
                     int port,
                     int MTU)
    : block("TCPServer", gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)),
      m_nPort(port),m_pzAddr(addr),m_nMTU(MTU)
{
    Setup();
    message_port_register_in(pmt::mp("in"));
    message_port_register_out(pmt::mp("out"));

    set_msg_handler(
                pmt::mp("in"),
                boost::bind(&TCPServer::TransmitLoop, this, _1)
                );
    //std::cout << "TCP SERVER CONSTRUCTED" << std::endl;
}
void
TCPServer::Setup()
{
    m_nSockFd=socket(AF_INET,SOCK_STREAM,0);
    memset(&m_ServerAddress,0,sizeof(m_ServerAddress));
    m_ServerAddress.sin_family=AF_INET;
    m_ServerAddress.sin_addr.s_addr=htonl(INADDR_ANY);
    m_ServerAddress.sin_port=htons(m_nPort);
    bind(m_nSockFd,(struct sockaddr *)&m_ServerAddress, sizeof(m_ServerAddress));
    int reusePort = 1;
    setsockopt(m_nSockFd, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
    listen(m_nSockFd,1);
}

void
TCPServer::ListenLoop()
{
    int n = 0;
    while(!m_bFinished)
    {
        socklen_t sosize  = sizeof(m_ClientAddress);
        m_nNewSockFd = accept(m_nSockFd,(struct sockaddr*)&m_ClientAddress,&sosize);

        // Receving Thread
        m_ReceiveThread = new boost::thread(boost::bind(&TCPServer::ReceiveLoop, this));

        }
}

void
TCPServer::TransmitLoop(pmt::pmt_t pmt_msg)
{
    // Extracting message from pmt
    pmt::pmt_t msg = pmt::cdr(pmt_msg);
    size_t offset(0);
    int msg_len = pmt::length(msg);
    unsigned char *msg_char = (unsigned char*)malloc(msg_len);
    memcpy(msg_char,pmt::uniform_vector_elements(msg, offset),msg_len);

    if(m_nNewSockFd != -1)
    {
            if( send(m_nNewSockFd , msg_char , msg_len , 0) < 0)
            {
                    //std::cout << "Send failed : " << std::endl;
                    //return false;
            }
    }
}

void
TCPServer::ReceiveLoop()
{
    int n = 0;
    while(!m_bFinished)
    {
        char msg[m_nMTU];
        n=recv(m_nNewSockFd,msg,m_nMTU,0);
        if(n==0)
        {
            close(m_nNewSockFd);
            break;
        }
        m_LineCode.UCharArrayDump(msg,n);
        std::cout << "NewSockFd : " << m_nNewSockFd << std::endl;
        std::cout << "Value of n : " << n << std::endl;
        std::cout << "Port number: " << m_nPort << std::endl;
        if (n > 0){
            std::cout << "Alias : " << this->alias() << std::endl;
            pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(msg,n)));
            message_port_pub(pmt::mp("out"), pdu);
        }
    }
}

TCPServer::~TCPServer()
{
    stop();
}

bool
TCPServer::start()
{
    m_bFinished = false;
    m_ListenThread = new boost::thread(boost::bind(&TCPServer::ListenLoop, this));
    return true;
}

bool
TCPServer::stop()
{
    m_bFinished = true;
    //m_ReceiveThread->detach();
    //m_ReceiveThread->join();
    m_ListenThread->detach();
    std::cout << "DETACHED" << std::endl;
    m_ListenThread->join();
    std::cout << "JOINED" << std::endl;

    std::cout << "RECEIVE THREAD STOPPED" << std::endl;
    return true;
}



} /* namespace blocks */
} /* namespace gr */

Reply via email to