[
https://issues.apache.org/jira/browse/BOOKKEEPER-79?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13152068#comment-13152068
]
Sijie Guo commented on BOOKKEEPER-79:
-------------------------------------
> What error do you get which shows this problem?
It core dump running the test case.
> there's a lock, which explicitly stops this situation.
The lock is for changing received value. stopReceiving can set receiving to
false since it can obtain the lock.
> I think this needs to be fixed so that the state of the channel is recorded
> when you stopReceiving
We don't need to record the reading state of the channel, since the receiving
check is in readSize which ensure it stop exactly in a frame of message.
{code}
/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
if (!channel->isReceiving()) {
return;
}
int toread = sizeof(uint32_t) - channel->in_buf.size();
LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
<< ", currently in buffer " << channel->in_buf.size()
<< " channel(" << channel.get() << ")");
if (toread < 0) {
DuplexChannel::sizeReadCallbackHandler(channel,
boost::system::error_code(), 0);
} else {
// in_buf_size.prepare(sizeof(uint32_t));
boost::asio::async_read(channel->socket, channel->in_buf,
boost::asio::transfer_at_least(sizeof(uint32_t)),
boost::bind(&DuplexChannel::sizeReadCallbackHandler,
channel,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
{code}
The race here is between #startReceiving, #stopReceiving, and #readSize, as
below
1. #readSize passed isReceiving() checking and before async_read is called.
2. main thread do #startReceiving #stopReceiving. it insert a new async_read op
to read size.
3. #readSize calls async_read to insert its async_read op to read size.
But you comment remind me that the patch only fix the above case. In following
case, it failed:
1. main thread #stopReceiving first
2. #readSize check receiving, return. no more async_read op are inserted.
3. main thread #startReceiving, it doesn't insert any async_read op, since
stopReceivingBefore is true.
I think the root cause of race here is between checking receiving flag and
changing receiving flag. we have lock in #startReceiving and #stopReceiving,
but we have no lock in #readSize which checks receiving flag.
The right fix may be:
need a flag which indicates whether there is a async_read size op in io
service. we only do readSize if there is no async_read size op in io service
when #startReceiving. said the flag is reading, its lock is reading_lock. the
code is showed as below.
{code}
void DuplexChannel::startReceiving() {
LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ")
currently receiving = " << receiving);
boost::lock_guard<boost::mutex> lock(receiving_lock);
if (receiving) {
return;
}
receiving = true;
{
boost::lock_guard<boost::mutex> lock(reading_lock);
if (!reading) {
reading = true;
DuplexChannel::doReadSize(shared_from_this());
}
}
}
// no change in #stopReceiving
void DuplexChannel::stopReceiving() {
LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");
boost::lock_guard<boost::mutex> lock(receiving_lock);
receiving = false;
stopReceivingBefore = true;
}
/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
boost::lock_guard<boost::mutex> lock(receiving_lock);
if (!channel->isReceiving()) {
boost::lock_guard<boost::mutex> lock(reading_lock);
reading = false;
return;
}
doReadSize(channel);
}
void doReadSize(DuplexChannelPtr channel) {
int toread = sizeof(uint32_t) - channel->in_buf.size();
LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
<< ", currently in buffer " << channel->in_buf.size()
<< " channel(" << channel.get() << ")");
if (toread < 0) {
DuplexChannel::sizeReadCallbackHandler(channel,
boost::system::error_code(), 0);
} else {
// in_buf_size.prepare(sizeof(uint32_t));
boost::asio::async_read(channel->socket, channel->in_buf,
boost::asio::transfer_at_least(sizeof(uint32_t)),
boost::bind(&DuplexChannel::sizeReadCallbackHandler,
channel,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
{code}
> randomly startDelivery/stopDelivery will core dump in c++ hedwig client
> -----------------------------------------------------------------------
>
> Key: BOOKKEEPER-79
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-79
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-client
> Affects Versions: 4.0.0
> Reporter: Sijie Guo
> Assignee: Sijie Guo
> Fix For: 4.0.0
>
> Attachments: BOOKKEEPER-79.patch_v2, bookkeeper-79.patch
>
>
> in our test program, we tried to startDelivery/stopDelivery different
> subscriptions randomly. And it core dump.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira