RobertIndie commented on code in PR #1410:
URL: https://github.com/apache/pulsar-client-go/pull/1410#discussion_r2422712821
##########
pulsar/negative_acks_tracker.go:
##########
@@ -95,15 +100,19 @@ func putNackEntry(t *negativeAcksTracker, batchMsgID
*messageID, delay time.Dura
defer t.Unlock()
targetTime := time.Now().Add(delay)
- trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
t.nackPrecisionBit))
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
*t.nackPrecisionBit))
// try get trimmedTime
value, exists := t.negativeAcks.Get(trimmedTime)
if !exists {
newMap := make(map[LedgerID]*roaring64.Bitmap)
t.negativeAcks.Put(trimmedTime, newMap)
value = newMap
}
- bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)
+ bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap)
+ if !ok {
+ t.log.Errorf("negativeAcksTracker: value for time %v is not of
expected type map[LedgerID]*roaring64.Bitmap", trimmedTime)
+ return
+ }
Review Comment:
This will hide the error. Why don't we panic it here?
##########
pulsar/consumer_impl.go:
##########
@@ -121,6 +121,10 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
+ if options.NackPrecisionBit == nil || *options.NackPrecisionBit < 0 {
Review Comment:
We should return the error if it's set to be <0.
##########
pulsar/consumer_test.go:
##########
@@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) {
// Failed messages should be resent
// We should only receive the odd messages
- for i := 1; i < N; i += 2 {
+ receivedOdd := 0
+ expectedOdd := (N + 1) / 2 // Expected number of odd message IDs
+
+ for receivedOdd < expectedOdd {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
- assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+ // Extract message ID from the payload (e.g., "msg-content-15")
+ var id int
+ _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d",
&id)
+ assert.Nil(t, err)
+
+ // Count only odd message IDs
+ if id%2 == 1 {
+ assert.True(t, id%2 == 1) // Optional check, included
for clarity
+ receivedOdd++
+ }
+
+ // Acknowledge message to mark it as processed
consumer.Ack(msg)
}
+
+ // Verify that the correct number of odd messages were received
+ assert.Equal(t, expectedOdd, receivedOdd)
+}
+
+func TestNegativeAckPrecisionBitCnt(t *testing.T) {
+ const delay = 1 * time.Second
+
+ for precision := 1; precision <= 8; precision++ {
+ topicName :=
fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision,
time.Now().UnixNano())
+ ctx := context.Background()
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ NackRedeliveryDelay: delay,
+ NackPrecisionBit: int64(precision),
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // Send single message
+ content := "test-0"
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(content),
+ })
+ assert.Nil(t, err)
+
+ // Receive and send negative ack
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, content, string(msg.Payload()))
+ consumer.Nack(msg)
+
+ // Calculate expected redelivery window
+ expectedRedelivery := time.Now().Add(delay)
+ deviation := time.Duration(int64(1)<<precision) *
time.Millisecond
+
+ // Wait for redelivery
+ redelivered, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, content, string(redelivered.Payload()))
+
+ now := time.Now()
+ // Assert that redelivery happens >= expected - deviation
+ assert.GreaterOrEqual(t, now.UnixMilli(),
expectedRedelivery.UnixMilli()-deviation.Milliseconds())
Review Comment:
This doesn't seem to be resolved.
##########
pulsar/consumer_test.go:
##########
@@ -1185,6 +1185,7 @@ func TestConsumerNack(t *testing.T) {
SubscriptionName: "sub-1",
Type: Shared,
NackRedeliveryDelay: 1 * time.Second,
+ NackPrecisionBit: 8,
Review Comment:
This doesn't seem to be resolved.
##########
pulsar/negative_acks_tracker.go:
##########
@@ -153,3 +203,5 @@ func (t *negativeAcksTracker) Close() {
t.doneCh <- nil
})
}
+
+var defaultNackPrecisionBit = int64(8)
Review Comment:
It's better to make it as a const value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]