This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2ff05  fix: avoid assert panic (#73)
8a2ff05 is described below

commit 8a2ff054d85bcbdba433b18639745920bbabc64c
Author: xujianhai666 <52450794+xujianhai...@users.noreply.github.com>
AuthorDate: Thu Oct 24 18:09:44 2019 +0800

    fix: avoid assert panic (#73)
    
    - add assert check
    
    Fixes #64
    
    Change-Id: Ibd355440ccf3b06ac60575a9306cfb66cb80d759
---
 pulsar/impl_partition_producer.go | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/pulsar/impl_partition_producer.go 
b/pulsar/impl_partition_producer.go
index 07e2da0..f09cd42 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -23,10 +23,11 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/golang/protobuf/proto"
+
        "github.com/apache/pulsar-client-go/pkg/pb"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/util"
-       "github.com/golang/protobuf/proto"
 
        log "github.com/sirupsen/logrus"
 )
@@ -293,7 +294,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
        p.internalFlushCurrentBatch()
 
-       pi := p.pendingQueue.PeekLast().(*pendingItem)
+       pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
+       if !ok {
+               fr.waitGroup.Done()
+               return
+       }
+
        pi.sendRequests = append(pi.sendRequests, &sendRequest{
                msg: nil,
                callback: func(id MessageID, message *ProducerMessage, e error) 
{
@@ -349,12 +355,14 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt) {
-       pi := p.pendingQueue.Peek().(*pendingItem)
+       pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-       if pi == nil {
+       if !ok {
                p.log.Warnf("Received ack for %v although the pending queue is 
empty", response.GetMessageId())
                return
-       } else if pi.sequenceID != response.GetSequenceId() {
+       }
+
+       if pi.sequenceID != response.GetSequenceId() {
                p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v", response.GetMessageId(),
                        response.GetSequenceId(), pi.sequenceID)
                return

Reply via email to