This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 8a2ff05 fix: avoid assert panic (#73) 8a2ff05 is described below commit 8a2ff054d85bcbdba433b18639745920bbabc64c Author: xujianhai666 <52450794+xujianhai...@users.noreply.github.com> AuthorDate: Thu Oct 24 18:09:44 2019 +0800 fix: avoid assert panic (#73) - add assert check Fixes #64 Change-Id: Ibd355440ccf3b06ac60575a9306cfb66cb80d759 --- pulsar/impl_partition_producer.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go index 07e2da0..f09cd42 100644 --- a/pulsar/impl_partition_producer.go +++ b/pulsar/impl_partition_producer.go @@ -23,10 +23,11 @@ import ( "sync/atomic" "time" + "github.com/golang/protobuf/proto" + "github.com/apache/pulsar-client-go/pkg/pb" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/util" - "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -293,7 +294,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() { func (p *partitionProducer) internalFlush(fr *flushRequest) { p.internalFlushCurrentBatch() - pi := p.pendingQueue.PeekLast().(*pendingItem) + pi, ok := p.pendingQueue.PeekLast().(*pendingItem) + if !ok { + fr.waitGroup.Done() + return + } + pi.sendRequests = append(pi.sendRequests, &sendRequest{ msg: nil, callback: func(id MessageID, message *ProducerMessage, e error) { @@ -349,12 +355,14 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { - pi := p.pendingQueue.Peek().(*pendingItem) + pi, ok := p.pendingQueue.Peek().(*pendingItem) - if pi == nil { + if !ok { p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId()) return - } else if pi.sequenceID != response.GetSequenceId() { + } + + if pi.sequenceID != response.GetSequenceId() { p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) return