jiazhai commented on a change in pull request #43: Support partition consumer receive async and fix batch logic URL: https://github.com/apache/pulsar-client-go/pull/43#discussion_r313375713
########## File path: pulsar/impl_partition_consumer.go ########## @@ -238,32 +252,64 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) { unsub.waitGroup.Done() } -func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case cm, ok := <-pc.subQueue: - if ok { - id := &pb.MessageIdData{} - err := proto.Unmarshal(cm.ID().Serialize(), id) - if err != nil { - pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error()) - return nil, err - } - if pc.unAckTracker != nil { - pc.unAckTracker.Add(id) - } - return cm.Message, nil +func (pc *partitionConsumer) trackMessage(msgID MessageID) error { + id := &pb.MessageIdData{} + err := proto.Unmarshal(msgID.Serialize(), id) + if err != nil { + pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error()) + return err + } + if pc.unAckTracker != nil { + pc.unAckTracker.Add(id) + } + return nil +} + +func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error { + highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1)) + if receivedSinceFlow >= highwater { + if err := pc.internalFlow(receivedSinceFlow); err != nil { + pc.log.Errorf("Send Flow cmd error:%s", err.Error()) + return err } - return nil, newError(ResultConnectError, "receive queue closed") + receivedSinceFlow = 0 + } + return nil +} + +func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error { + err := pc.trackMessage(msgID) + if err != nil { + return err + } + receivedSinceFlow++ + + err = pc.increaseAvailablePermits(receivedSinceFlow) + if err != nil { + return err } + + return nil +} + +func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) { + wg := &sync.WaitGroup{} + wg.Add(1) + pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) { + message = msg + err = e + wg.Done() + }) + wg.Wait() + + return message, err } func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { - highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1)) + highWater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1)) // request half the buffer's capacity - if err := pc.internalFlow(highwater); err != nil { + if err := pc.internalFlow(highWater); err != nil { Review comment: also here, in background ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services