panszobe opened a new issue, #993: URL: https://github.com/apache/pulsar-client-go/issues/993
#### Expected behavior All of the messages should be acked when `EnableBatchIndexAck` is false, as the same behavior of setting `EnableBatchIndexAck` true. #### Actual behavior When setting `EnableBatchIndexAck` to false, there were more and more messaged unacked, Grafana dashboards as below: <img width="1377" alt="image" src="https://user-images.githubusercontent.com/75996911/226072338-060010cb-73ef-451a-a7ff-0212222f921e.png"> And it led to that Pulsar Server would take more and more memory to handle unacked messages though Client could consume messages at the same time, but backlog became larger, and will cause consuming duplicated messages when restarting consumer. #### System configuration pulsar-client-go version: master(**v0.9.1-0.20230313030101-bcbac9f2ae5c**) #### Code Review I think the root cause is as below: ``` func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") } if cmid, ok := msgID.(*chunkMessageID); ok { return pc.unAckChunksTracker.ack(cmid) } trackingID := toTrackingMessageID(msgID) if trackingID != nil && trackingID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) } else if !pc.options.enableBatchIndexAck { return nil } var ackReq *ackRequest if withResponse { ackReq := pc.sendIndividualAck(trackingID) <-ackReq.doneCh } else { pc.ackGroupingTracker.add(trackingID) } pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) if ackReq == nil { return nil } return ackReq.err } func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) map[[2]uint64]*bitset.BitSet { t.Lock() defer t.Unlock() key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())} batchIdx := id.BatchIdx() batchSize := id.BatchSize() if batchIdx >= 0 && batchSize > 0 { bs, found := t.pendingAcks[key] if !found { if batchSize > 1 { bs = bitset.New(uint(batchSize)) for i := uint(0); i < uint(batchSize); i++ { bs.Set(i) } } t.pendingAcks[key] = bs } if bs != nil { bs.Clear(uint(batchIdx)) } } else { t.pendingAcks[key] = nil } if len(t.pendingAcks) >= t.maxNumAcks { pendingAcks := t.pendingAcks t.pendingAcks = make(map[[2]uint64]*bitset.BitSet) return pendingAcks } return nil } ``` When `EnableBatchIndexAck` is false, if `trackingID.ack()` is false, which means that not all of the messages acked, at this time the message will not be added by `ackGroupingTracker`, the message will not be pushed into the `pendingAcks`. At this scene, there are some messages in the same batch always split to different flush batches to send ACK request because of the limit of `AckGroupingOptions`, though the `ackTracker` will record the batch and the Last message will be added to `pendingAcks` of `ackGroupingTracker`, but according to `timedAckGroupingTracker.tryAddIndividual()` implementation, it will set other bitsets to 1 and only clear bitset of the such message when batch message added at the first time, this is correct under `EnableBatchIndexAck` true situation, but will cause some messages unacked under `EnableBatchIndexAck` false situation. Only one batchIdx of the batch can be acked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
