Re: Multithreading in GR blocks

2020-04-13 Thread Albin Stigö
An alternative is to use none blocking IO and use poll or select in your
worker thread and use another fd created with pipe(2) to signal that you
are done... Or a timer fd like Sylvain suggests. Not sure if boost has some
portability wrapping for this?

--Albin

On Mon, Apr 13, 2020, 19:59 Sylvain Munaut <246...@gmail.com> wrote:

> The m_finished thing only works if you're not using any blocking calls.
>
> But you're using `accept` and `recv` etc ... all calls that can block
> forever until they get something.
> You need to use `select` on the file descriptors while waiting for
> events / data and set a timeout on that select so your code has a
> periodic opportunity to check the m_finished flag.
>
> Cheers,
>
> Sylvain
>
>


Re: Multithreading in GR blocks

2020-04-13 Thread Marcus Müller
Hi Moses,

ah sorry, I was misreading your code!
Looking at this yes, classical socket programming problem:

Looks like your ListenLoop probably simply hangs on `accept`; that's
because the socket is in blocking mode by default.

So, you'd "simply" do the following:

1. prepare your socket as done in Setup(), but make it non-blocking.
2. in your ListenLoop use `select()`, which blocks, on the socket fd
3. When select() returns:
a.
 try to accept(). That can either work instantly (then, select()
finished because there was a connection ready to be accepted), or fail
instantly (because select() finished for any other reason, e.g. the
socket being closed)
b. check whether the socket is still open && !m_bFinished

Or don't. Honestly, writing another TCP socket acceptor in 2020 does
sound like a somewhat tedious thing to do ;) Look at GNU Radio's
gr-network in-tree component, it should have everything you need.

Best regards,
Marcus

On 13.04.20 19:21, Moses Browne Mwakyanjala wrote:
> Hi Marcus,
> I was trying to emulate how the ZMQ block handles multithreading.
> Basically, the ZMQ block overrides the stop() function and joins the
> threads. This is the same thing I tried to do to stop the receive and
> listen threads. Is there any other way of doing this?
> 
> bool
> 
> TCPServer::stop()
> 
> {
> 
> m_bFinished=true;
> 
> m_ReceiveThread->join();
> 
> m_ListenThread->join();
> 
> returntrue;
> 
> }
> 
> Regards,
> 
> Moses.
> 
> 
> On Mon, Apr 13, 2020 at 7:11 PM Marcus Müller  > wrote:
> 
> Hi Moses,
> 
> your code doesn't show how your GNU Radio block's stop() function would
> tell your TCP Server thread that it's time to shut down, so I presume
> that doesn't happen – that would explain why the flow graph can't ever
> shut down!
> 
> Best regards,
> Marcus
> 
> On 13.04.20 18:54, Moses Browne Mwakyanjala wrote:
> > Hello everyone,
> > I have created a TCP/IP block by adapting the ZMQ message pub block.
> > Both blocks make use of boost multithreading. The TCP/IP block is used
> > by a standalone C++ program. To run the gnuradio topblock, the C++
> > program calls tb->start() function. To stop the topblock, the
> > functions tb->stop() and tb->wait() are called.However, the program
> > "hangs" when tb->stop() is called. This suggests there is something
> > wrong with the way I use boost multithreading. 
> > All help is appreciated.
> >
> > Regards,
> > Moses. 
> 




Re: Multithreading in GR blocks

2020-04-13 Thread Sylvain Munaut
The m_finished thing only works if you're not using any blocking calls.

But you're using `accept` and `recv` etc ... all calls that can block
forever until they get something.
You need to use `select` on the file descriptors while waiting for
events / data and set a timeout on that select so your code has a
periodic opportunity to check the m_finished flag.

Cheers,

Sylvain



Re: Multithreading in GR blocks

2020-04-13 Thread Moses Browne Mwakyanjala
Hi Marcus,
I was trying to emulate how the ZMQ block handles multithreading.
Basically, the ZMQ block overrides the stop() function and joins the
threads. This is the same thing I tried to do to stop the receive and
listen threads. Is there any other way of doing this?

bool

TCPServer::stop()

{

m_bFinished = true;

m_ReceiveThread->join();

m_ListenThread->join();

return true;

}

Regards,

Moses.


On Mon, Apr 13, 2020 at 7:11 PM Marcus Müller  wrote:

> Hi Moses,
>
> your code doesn't show how your GNU Radio block's stop() function would
> tell your TCP Server thread that it's time to shut down, so I presume
> that doesn't happen – that would explain why the flow graph can't ever
> shut down!
>
> Best regards,
> Marcus
>
> On 13.04.20 18:54, Moses Browne Mwakyanjala wrote:
> > Hello everyone,
> > I have created a TCP/IP block by adapting the ZMQ message pub block.
> > Both blocks make use of boost multithreading. The TCP/IP block is used
> > by a standalone C++ program. To run the gnuradio topblock, the C++
> > program calls tb->start() function. To stop the topblock, the
> > functions tb->stop() and tb->wait() are called.However, the program
> > "hangs" when tb->stop() is called. This suggests there is something
> > wrong with the way I use boost multithreading.
> > All help is appreciated.
> >
> > Regards,
> > Moses.
>
>


Re: Multithreading in GR blocks

2020-04-13 Thread Marcus Müller
Hi Moses,

your code doesn't show how your GNU Radio block's stop() function would
tell your TCP Server thread that it's time to shut down, so I presume
that doesn't happen – that would explain why the flow graph can't ever
shut down!

Best regards,
Marcus

On 13.04.20 18:54, Moses Browne Mwakyanjala wrote:
> Hello everyone,
> I have created a TCP/IP block by adapting the ZMQ message pub block.
> Both blocks make use of boost multithreading. The TCP/IP block is used
> by a standalone C++ program. To run the gnuradio topblock, the C++
> program calls tb->start() function. To stop the topblock, the
> functions tb->stop() and tb->wait() are called.However, the program
> "hangs" when tb->stop() is called. This suggests there is something
> wrong with the way I use boost multithreading. 
> All help is appreciated.
> 
> Regards,
> Moses. 



smime.p7s
Description: S/MIME Cryptographic Signature


Multithreading in GR blocks

2020-04-13 Thread Moses Browne Mwakyanjala
Hello everyone,
I have created a TCP/IP block by adapting the ZMQ message pub block. Both
blocks make use of boost multithreading. The TCP/IP block is used by a
standalone C++ program. To run the gnuradio topblock, the C++ program calls
tb->start() function. To stop the topblock, the functions tb->stop() and
tb->wait() are called.However, the program "hangs" when tb->stop() is
called. This suggests there is something wrong with the way I use boost
multithreading.
All help is appreciated.

Regards,
Moses.
/* -*- c++ -*- */
/*
 * Copyright 2013 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifndef INCLUDED_BLOCKS_TCPSERVER_H
#define INCLUDED_BLOCKS_TCPSERVER_H

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "src/pcm/LineCode.h"

namespace icd {
namespace net {
class TCPServer;
typedef boost::shared_ptr TCPServer_sptr;
TCPServer_sptr
make_TCPServer(std::string addr,
   int port,
   int MTU = 1);
class TCPServer : public gr::block
{
friend
TCPServer_sptr
make_TCPServer(std::string addr,
   int port,
   int MTU);
private:
boost::thread* m_ListenThread, *m_ReceiveThread;
int m_nPort;
std::string m_pzAddr;
int m_nSockFd, m_nNewSockFd;
struct sockaddr_in m_ServerAddress;
struct sockaddr_in m_ClientAddress;
void Setup();
void ListenLoop();
void ReceiveLoop();
void TransmitLoop(pmt::pmt_t pmt_msg);
bool m_bFinished;
int m_nMTU;
pcm::LineCode m_LineCode;

public:
TCPServer(std::string addr,
  int port,
  int MTU = 1);
~TCPServer();
bool start();
bool stop();
};

} /* namespace blocks */
} /* namespace gr */

#endif /* INCLUDED_BLOCKS_TCPServer_H */
/* -*- c++ -*- */
/*
 * Copyright 2013,2019 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */


#include "TCPServer.h"
#include 

namespace icd {
namespace net {

TCPServer_sptr
make_TCPServer(std::string addr,
   int port,
   int MTU)
{
return gnuradio::get_initial_sptr(
new TCPServer(addr, port, MTU));
}

TCPServer::TCPServer(std::string addr,
 int port,
 int MTU)
: block("TCPServer", gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)),
  m_nPort(port),m_pzAddr(addr),m_nMTU(MTU)
{
Setup();
message_port_register_in(pmt::mp("in"));
message_port_register_out(pmt::mp("out"));

set_msg_handler(
pmt::mp("in"),
boost::bind(&TCPServer::TransmitLoop, this, _1)
);
//std::cout << "TCP SERVER CONSTRUCTED" << std::endl;
}
void
TCPServer::Setup()
{
m_nSockFd=socket(AF_INET,SOCK_STREAM,0);
memset(&m_ServerAddress,0,sizeof(m_ServerAddress));
m_ServerAddress.sin_family=AF_INET;
m_ServerAddress.sin_addr.s_addr=htonl(INADDR_ANY);
m_ServerAddress.sin_port=htons(m_nPort);
bind(m_nSockFd,(struct sockaddr *)&m_ServerAddress, sizeof(m_ServerAddress));
int reusePort = 1;
setsockopt(m_nSockFd, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
listen(m_nSockFd,1);
}

void
TCPServer::ListenLoop()
{
int n = 0;
while(!m_bFinished)
{
socklen_t sosize  = sizeof(m_ClientAddress);
m_nNewSockFd = accept(m_nSockFd,(struct sockaddr*)&m_ClientAddress,&sosize);

// Receving Thread
m_ReceiveThread = new boost::thread(boost::bind(&TCPServer::ReceiveLoop, this));

}
}

void
TCPServer::TransmitLoop(pmt::pmt_t pmt_msg)
{
// Extracting message from pmt
pmt::pmt_t msg = pmt::cdr(pmt_msg);
size_t offset(0);
int msg_len = pmt::length(msg);
unsigned char *msg_char = (unsigned char*)malloc(msg_len);
memcpy(msg_char,pmt::uniform_vector_elements(msg, offset),msg_len);

if(m_nNewSockFd != -1)
{
if( send(m_nNewSockFd , msg_char , msg_len , 0) < 0)
{
//std::cout << "Send failed : " << std::endl;
//return false;
}
}
}

void
TCPServer::ReceiveLoop()
{
int n = 0;
while(!m_bFinished)
{
char msg[m_nMTU];
n=recv(m_nNewSockFd,msg,m_nMTU,0);
if(n==0)
{
close(m_nNewSockFd);
break;
}
m_LineCode.UCharArrayDump(msg,n);
std::cout << "NewSockFd : " << m_nNewSockFd << std::endl;
std::cout << "Value of n : " << n << std::endl;
std::cout << "Port number: " << m_nPort << std::endl;
if (n > 0){
std::cout << "Alias : " << this->alias() << std::endl;
pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(msg,n)));
message_port_pub(pmt::mp("out"), pdu);
}
}
}

TCPServer::~TCPServer()
{
stop();
}

bool
TCPServer::start()
{
m_bFinished = false;
m_ListenThread = new b