[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-79?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13152068#comment-13152068
 ] 

Sijie Guo commented on BOOKKEEPER-79:
-------------------------------------

> What error do you get which shows this problem?

It core dump running the test case.

> there's a lock, which explicitly stops this situation.

The lock is for changing received value. stopReceiving can set receiving to 
false since it can obtain the lock.

> I think this needs to be fixed so that the state of the channel is recorded 
> when you stopReceiving

We don't need to record the reading state of the channel, since the receiving 
check is in readSize which ensure it stop exactly in a frame of message.

{code}

/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
  if (!channel->isReceiving()) {
    return;
  }

  int toread = sizeof(uint32_t) - channel->in_buf.size();
  LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
                << ", currently in buffer " << channel->in_buf.size()
                << " channel(" << channel.get() << ")");

  if (toread < 0) {
    DuplexChannel::sizeReadCallbackHandler(channel, 
boost::system::error_code(), 0);
  } else {
    //  in_buf_size.prepare(sizeof(uint32_t));
    boost::asio::async_read(channel->socket, channel->in_buf,
                            boost::asio::transfer_at_least(sizeof(uint32_t)),
                            boost::bind(&DuplexChannel::sizeReadCallbackHandler,
                                        channel,
                                        boost::asio::placeholders::error,
                                        
boost::asio::placeholders::bytes_transferred));
  }
}
{code}

The race here is between #startReceiving, #stopReceiving, and #readSize, as 
below

1. #readSize passed isReceiving() checking and before async_read is called.
2. main thread do #startReceiving #stopReceiving. it insert a new async_read op 
to read size.
3. #readSize calls async_read to insert its async_read op to read size.

But you comment remind me that the patch only fix the above case. In following 
case, it failed:

1. main thread #stopReceiving first
2. #readSize check receiving, return. no more async_read op are inserted.
3. main thread #startReceiving, it doesn't insert any async_read op, since 
stopReceivingBefore is true.

I think the root cause of race here is between checking receiving flag and 
changing receiving flag. we have lock in #startReceiving and #stopReceiving, 
but we have no lock in #readSize which checks receiving flag.

The right fix may be:
need a flag which indicates whether there is a async_read size op in io 
service. we only do readSize if there is no async_read size op in io service 
when #startReceiving. said the flag is reading, its lock is reading_lock. the 
code is showed as below.

{code}
void DuplexChannel::startReceiving() {
  LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") 
currently receiving = " << receiving);

  boost::lock_guard<boost::mutex> lock(receiving_lock);
  if (receiving) {
    return;
  }
  receiving = true;

  {
    boost::lock_guard<boost::mutex> lock(reading_lock);
    if (!reading) {
      reading = true;
      DuplexChannel::doReadSize(shared_from_this());
    }
  }
}

// no change in #stopReceiving
void DuplexChannel::stopReceiving() {
  LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");

  boost::lock_guard<boost::mutex> lock(receiving_lock);
  receiving = false;
  stopReceivingBefore = true;
}

/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
  boost::lock_guard<boost::mutex> lock(receiving_lock);
  if (!channel->isReceiving()) {
    boost::lock_guard<boost::mutex> lock(reading_lock);
    reading = false;
    return;
  }

  doReadSize(channel);
}

void doReadSize(DuplexChannelPtr channel) {

  int toread = sizeof(uint32_t) - channel->in_buf.size();
  LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
                << ", currently in buffer " << channel->in_buf.size()
                << " channel(" << channel.get() << ")");

  if (toread < 0) {
    DuplexChannel::sizeReadCallbackHandler(channel, 
boost::system::error_code(), 0);
  } else {
    //  in_buf_size.prepare(sizeof(uint32_t));
    boost::asio::async_read(channel->socket, channel->in_buf,
                            boost::asio::transfer_at_least(sizeof(uint32_t)),
                            boost::bind(&DuplexChannel::sizeReadCallbackHandler,
                                        channel,
                                        boost::asio::placeholders::error,
                                        
boost::asio::placeholders::bytes_transferred));
  }
}
{code}

                
> randomly startDelivery/stopDelivery will core dump in c++ hedwig client
> -----------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-79
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-79
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-client
>    Affects Versions: 4.0.0
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>             Fix For: 4.0.0
>
>         Attachments: BOOKKEEPER-79.patch_v2, bookkeeper-79.patch
>
>
> in our test program, we tried to startDelivery/stopDelivery different 
> subscriptions randomly. And it core dump.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to