jiazhai commented on a change in pull request #43: Support partition consumer 
receive async and fix batch logic
URL: https://github.com/apache/pulsar-client-go/pull/43#discussion_r313375713
 
 

 ##########
 File path: pulsar/impl_partition_consumer.go
 ##########
 @@ -238,32 +252,64 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*handleUnsubscribe) {
        unsub.waitGroup.Done()
 }
 
-func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
-       select {
-       case <-ctx.Done():
-               return nil, ctx.Err()
-       case cm, ok := <-pc.subQueue:
-               if ok {
-                       id := &pb.MessageIdData{}
-                       err := proto.Unmarshal(cm.ID().Serialize(), id)
-                       if err != nil {
-                               pc.log.WithError(err).Errorf("unserialize 
message id error:%s", err.Error())
-                               return nil, err
-                       }
-                       if pc.unAckTracker != nil {
-                               pc.unAckTracker.Add(id)
-                       }
-                       return cm.Message, nil
+func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
+       id := &pb.MessageIdData{}
+       err := proto.Unmarshal(msgID.Serialize(), id)
+       if err != nil {
+               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
+               return err
+       }
+       if pc.unAckTracker != nil {
+               pc.unAckTracker.Add(id)
+       }
+       return nil
+}
+
+func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow 
uint32) error {
+       highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+       if receivedSinceFlow >= highwater {
+               if err := pc.internalFlow(receivedSinceFlow); err != nil {
+                       pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+                       return err
                }
-               return nil, newError(ResultConnectError, "receive queue closed")
+               receivedSinceFlow = 0
+       }
+       return nil
+}
+
+func (pc *partitionConsumer) messageProcessed(msgID MessageID, 
receivedSinceFlow uint32) error {
+       err := pc.trackMessage(msgID)
+       if err != nil {
+               return err
+       }
+       receivedSinceFlow++
+
+       err = pc.increaseAvailablePermits(receivedSinceFlow)
+       if err != nil {
+               return err
        }
+
+       return nil
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, 
err error) {
+       wg := &sync.WaitGroup{}
+       wg.Add(1)
+       pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
+               message = msg
+               err = e
+               wg.Done()
+       })
+       wg.Wait()
+
+       return message, err
 }
 
 func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
-       highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+       highWater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
 
        // request half the buffer's capacity
-       if err := pc.internalFlow(highwater); err != nil {
+       if err := pc.internalFlow(highWater); err != nil {
 
 Review comment:
   also here, in background

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to