panszobe opened a new issue, #993:
URL: https://github.com/apache/pulsar-client-go/issues/993

   #### Expected behavior
   
   All of the messages should be acked when `EnableBatchIndexAck` is false, as 
the same behavior of setting `EnableBatchIndexAck` true.
   
   #### Actual behavior
   
   When setting `EnableBatchIndexAck` to false, there were more and more 
messaged unacked, Grafana dashboards as below:
   
   <img width="1377" alt="image" 
src="https://user-images.githubusercontent.com/75996911/226072338-060010cb-73ef-451a-a7ff-0212222f921e.png";>
   
   And it led to that Pulsar Server would take more and more memory to handle 
unacked messages though Client could consume messages at the same time, but 
backlog became larger, and will cause consuming duplicated messages when 
restarting consumer.
   
   #### System configuration
   
   pulsar-client-go version: master(**v0.9.1-0.20230313030101-bcbac9f2ae5c**)
   
   
   #### Code Review
   I think the root cause is as below:
   
   ```
   func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error 
{
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
                pc.log.WithField("state", state).Error("Failed to ack by 
closing or closed consumer")
                return errors.New("consumer state is closed")
        }
   
        if cmid, ok := msgID.(*chunkMessageID); ok {
                return pc.unAckChunksTracker.ack(cmid)
        }
   
        trackingID := toTrackingMessageID(msgID)
   
        if trackingID != nil && trackingID.ack() {
                pc.metrics.AcksCounter.Inc()
                
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
 / 1.0e9)
        } else if !pc.options.enableBatchIndexAck {
                return nil
        }
   
        var ackReq *ackRequest
        if withResponse {
                ackReq := pc.sendIndividualAck(trackingID)
                <-ackReq.doneCh
        } else {
                pc.ackGroupingTracker.add(trackingID)
        }
        pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
        if ackReq == nil {
                return nil
        }
        return ackReq.err
   }
   
   func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) 
map[[2]uint64]*bitset.BitSet {
        t.Lock()
        defer t.Unlock()
        key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
   
        batchIdx := id.BatchIdx()
        batchSize := id.BatchSize()
   
        if batchIdx >= 0 && batchSize > 0 {
                bs, found := t.pendingAcks[key]
                if !found {
                        if batchSize > 1 {
                                bs = bitset.New(uint(batchSize))
                                for i := uint(0); i < uint(batchSize); i++ {
                                        bs.Set(i)
                                }
                        }
                        t.pendingAcks[key] = bs
                }
                if bs != nil {
                        bs.Clear(uint(batchIdx))
                }
        } else {
                t.pendingAcks[key] = nil
        }
   
        if len(t.pendingAcks) >= t.maxNumAcks {
                pendingAcks := t.pendingAcks
                t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
                return pendingAcks
        }
        return nil
   }
   ```
   
   When `EnableBatchIndexAck` is false, if `trackingID.ack()` is false, which 
means that not all of the messages acked, at this time the message will not be 
added by `ackGroupingTracker`, the message will not be pushed into the 
`pendingAcks`. 
   At this scene, there are some messages in the same batch always split to 
different flush batches to send ACK request because of  the limit of 
`AckGroupingOptions`, though the `ackTracker` will record the batch and the 
Last message will be added to `pendingAcks` of `ackGroupingTracker`, but 
according to `timedAckGroupingTracker.tryAddIndividual()` implementation, it 
will set other bitsets to 1 and only clear bitset of  the such message  when 
batch message added at the first time, this is correct under 
`EnableBatchIndexAck` true situation, but will cause some messages unacked 
under `EnableBatchIndexAck` false situation. Only one batchIdx of the batch can 
be acked.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to