Copilot commented on code in PR #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464#discussion_r2746104916


##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                partitionIdx: pc.partitionIdx,
        }
 
-       if msgMeta.GetChunkId() == 0 {
+       if chunkID == 0 {
+               // Handle ack hole case when receive duplicated chunks.
+               // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+               // Situation 1 - Message redeliver:
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+               // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+               // Situation 2 - Corrupted chunk message
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+               // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+               // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+               if pc.chunkedMsgCtxMap.get(uuid) != nil {
+                       ctx := pc.chunkedMsgCtxMap.get(uuid)

Review Comment:
   Race condition: The context is retrieved twice using get() without holding a 
lock between calls. Between lines 1481 and 1482, the context could be modified 
or removed by another goroutine. Consider storing the result of get() in a 
single variable to avoid potential nil pointer dereference or inconsistent 
state.
   ```suggestion
                ctx := pc.chunkedMsgCtxMap.get(uuid)
                if ctx != nil {
   ```



##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                partitionIdx: pc.partitionIdx,
        }
 
-       if msgMeta.GetChunkId() == 0 {
+       if chunkID == 0 {
+               // Handle ack hole case when receive duplicated chunks.
+               // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+               // Situation 1 - Message redeliver:
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+               // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+               // Situation 2 - Corrupted chunk message
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+               // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+               // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+               if pc.chunkedMsgCtxMap.get(uuid) != nil {
+                       ctx := pc.chunkedMsgCtxMap.get(uuid)
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               ctx.discard(pc)
+                       }
+                       // The first chunk of a new chunked-message received
+                       // before receiving other chunks of previous 
chunked-message
+                       // so, remove previous chunked-message from map and 
release buffer
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk id=0 
message with messageId [%s], sequenceId [%d], "+
+                                       "uuid [%s]. Remove previous chunk 
context with lastChunkedMsgID [%d]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                               ctx.lastChunkedMsgID,
+                       ))
+                       ctx.chunkedMsgBuffer.Clear()
+                       pc.chunkedMsgCtxMap.remove(uuid)
+               }
                pc.chunkedMsgCtxMap.addIfAbsent(uuid,
                        numChunks,
                        totalChunksSize,
                )
        }
 
+       // discard message if chunk is out-of-order
        ctx := pc.chunkedMsgCtxMap.get(uuid)
-
        if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != 
ctx.lastChunkedMsgID+1 {
+               // Filter and ack duplicated chunks instead of discard ctx.
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+               // We should filter and ack chunk-4 and chunk-5.
+               if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk message 
with messageId [%s], "+
+                                       "last-chunk-Id [%d], chunkId [%d], 
sequenceId [%d], uuid [%s]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               ctx.lastChunkedMsgID,
+                               chunkID,
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                       ))
+                       // Just like the above logic of receiving the first 
chunk again.
+                       // We only ack this chunk in the message duplication 
case.
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               pc.AckID(toTrackingMessageID(msgID))
+                       }

Review Comment:
   Missing flow control update: When a duplicated chunk is filtered and 
returned early (line 1557), the available permits are not incremented. This 
could lead to flow control issues as the broker sent a message that was 
consumed but not counted. Consider adding pc.availablePermits.inc() before the 
return statement to maintain proper flow control, similar to how it's done at 
line 1593 for non-final chunks.
   ```suggestion
                        }
                        pc.availablePermits.Inc()
   ```



##########
pulsar/message_chunking_test.go:
##########
@@ -578,3 +581,167 @@ func sendSingleChunk(p Producer, uuid string, chunkID 
int, totalChunks int) {
                uint32(internal.MaxMessageSize),
        )
 }
+
+func TestChunkWithReconnection(t *testing.T) {
+       sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
+       client, err := NewClient(ClientOptions{
+               URL:    lookupURL,
+               Logger: log.NewLoggerWithSlog(sLogger),
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 100,
+               MaxPendingMessages:  200000,
+               SendTimeout:         60 * time.Second,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: "chunk-subscriber",
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, c)
+       defer c.Close()
+
+       // Reduce publish rate to prevent the producer sending messages too fast
+       url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + 
"/publishRate"
+       makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\": 
1,\"publishThrottlingRateInByte\": 100}")
+       // Need to wait some time to let the rate limiter take effect
+       time.Sleep(2 * time.Second)
+
+       // payload/ChunkMaxMessageSize = 1000/100 = 10 msg, and 
publishThrottlingRateInMsg = 1
+       // so that this chunk msg will send finish after 10 seconds
+       producer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(1000),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               assert.Nil(t, err)
+       })
+       assert.NoError(t, err)
+
+       time.Sleep(5 * time.Second)
+       //      trigger topic unload to test sending chunk msg with reconnection
+       url = adminURL + "/" + "admin/v2/persistent/public/default/" + topic + 
"/unload"
+       makeHTTPCall(t, http.MethodPut, url, "")
+       // Need to wait some time to receive all chunk messages
+       time.Sleep(10 * time.Second)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err := c.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.NotNil(t, msg.ID())
+}
+
+func TestResendChunkMessages(t *testing.T) {
+       sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
+       client, err := NewClient(ClientOptions{
+               URL:    lookupURL,
+               Logger: log.NewLoggerWithSlog(sLogger),
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 100,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                    topic,
+               Type:                     Exclusive,
+               SubscriptionName:         "chunk-subscriber",
+               MaxPendingChunkedMessage: 10,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, c)
+       defer c.Close()
+
+       sendSingleChunk(producer, "0", 0, 2)
+       sendSingleChunk(producer, "0", 0, 2) // Resending the first chunk
+       sendSingleChunk(producer, "1", 0, 3) // This is for testing the 
interwoven chunked message
+       sendSingleChunk(producer, "1", 1, 3)
+       sendSingleChunk(producer, "1", 0, 3) // Resending the UUID-1 chunked 
message
+       sendSingleChunk(producer, "0", 1, 2)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err := c.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.Equal(t, "chunk-0-0|chunk-0-1|", string(msg.Payload()))
+       c.Ack(msg)
+
+       sendSingleChunk(producer, "1", 1, 3)
+       sendSingleChunk(producer, "1", 2, 3)
+       ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err = c.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.Equal(t, "chunk-1-0|chunk-1-1|chunk-1-2|", string(msg.Payload()))
+       c.Ack(msg)
+}
+
+func TestResendChunkWithAckHoleMessages(t *testing.T) {
+       sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
+       client, err := NewClient(ClientOptions{
+               URL:    lookupURL,
+               Logger: log.NewLoggerWithSlog(sLogger),
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     true,
+               EnableChunking:      true,
+               ChunkMaxMessageSize: 100,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       c, err := client.Subscribe(ConsumerOptions{
+               Topic:                    topic,
+               Type:                     Exclusive,
+               SubscriptionName:         "chunk-subscriber",
+               MaxPendingChunkedMessage: 10,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, c)
+       defer c.Close()
+
+       sendSingleChunk(producer, "0", 0, 4)
+       sendSingleChunk(producer, "0", 1, 4)
+       sendSingleChunk(producer, "0", 2, 4)
+       sendSingleChunk(producer, "0", 1, 4) // Resending previous chunk
+       sendSingleChunk(producer, "0", 2, 4)
+       sendSingleChunk(producer, "0", 3, 4)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err := c.Receive(ctx)
+       cancel()
+       assert.NoError(t, err)
+       assert.Equal(t, "chunk-0-0|chunk-0-1|chunk-0-2|chunk-0-3|", 
string(msg.Payload()))
+       c.Ack(msg)
+
+       sendSingleChunk(producer, "1", 0, 4)
+       sendSingleChunk(producer, "1", 1, 4)
+       sendSingleChunk(producer, "1", 4, 4) // send broken chunk
+       ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err = c.Receive(ctx)
+       cancel()
+       assert.Error(t, context.DeadlineExceeded, err)

Review Comment:
   Incorrect test assertion: assert.Error expects two arguments (the error 
value and a message string), but here it's being called with (expected error, 
actual error). This should be assert.ErrorIs(err, context.DeadlineExceeded) to 
check if the error is the expected deadline exceeded error, or simply 
assert.Error(t, err) if just checking for any error is sufficient.
   ```suggestion
        assert.ErrorIs(t, err, context.DeadlineExceeded)
   ```



##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                partitionIdx: pc.partitionIdx,
        }
 
-       if msgMeta.GetChunkId() == 0 {
+       if chunkID == 0 {
+               // Handle ack hole case when receive duplicated chunks.
+               // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+               // Situation 1 - Message redeliver:
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+               // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+               // Situation 2 - Corrupted chunk message
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+               // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+               // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+               if pc.chunkedMsgCtxMap.get(uuid) != nil {
+                       ctx := pc.chunkedMsgCtxMap.get(uuid)
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               ctx.discard(pc)
+                       }
+                       // The first chunk of a new chunked-message received
+                       // before receiving other chunks of previous 
chunked-message
+                       // so, remove previous chunked-message from map and 
release buffer
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk id=0 
message with messageId [%s], sequenceId [%d], "+
+                                       "uuid [%s]. Remove previous chunk 
context with lastChunkedMsgID [%d]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                               ctx.lastChunkedMsgID,
+                       ))
+                       ctx.chunkedMsgBuffer.Clear()
+                       pc.chunkedMsgCtxMap.remove(uuid)
+               }
                pc.chunkedMsgCtxMap.addIfAbsent(uuid,
                        numChunks,
                        totalChunksSize,
                )
        }
 
+       // discard message if chunk is out-of-order
        ctx := pc.chunkedMsgCtxMap.get(uuid)
-
        if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != 
ctx.lastChunkedMsgID+1 {
+               // Filter and ack duplicated chunks instead of discard ctx.
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+               // We should filter and ack chunk-4 and chunk-5.
+               if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk message 
with messageId [%s], "+
+                                       "last-chunk-Id [%d], chunkId [%d], 
sequenceId [%d], uuid [%s]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               ctx.lastChunkedMsgID,
+                               chunkID,
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                       ))
+                       // Just like the above logic of receiving the first 
chunk again.
+                       // We only ack this chunk in the message duplication 
case.
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               pc.AckID(toTrackingMessageID(msgID))
+                       }
+                       return nil
+               }
+               // Chunked messages rely on TCP to ensure that chunk IDs are 
strictly increasing within a partition.
+               // If the current chunk ID is greater than ctx.lastChunkedMsgID 
+ 1,
+               // it indicates that the current chunk is corrupted and 
requires resource cleanup.
                lastChunkedMsgID := -1
                totalChunks := -1
                if ctx != nil {
                        lastChunkedMsgID = int(ctx.lastChunkedMsgID)
                        totalChunks = int(ctx.totalChunks)
-                       ctx.chunkedMsgBuffer.Clear()
                }
                pc.log.Warnf(fmt.Sprintf(
-                       "Received unexpected chunk messageId %s, last-chunk-id 
%d, chunkId = %d, total-chunks %d",
-                       msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
-               pc.chunkedMsgCtxMap.remove(uuid)
-               pc.availablePermits.inc()
+                       "[%s] [%s] Received unexpected chunk messageId [%s], 
last-chunk-id [%d], "+
+                               "chunkId = [%d], total-chunks [%d], sequenceId 
[%d], uuid [%s]",
+                       pc.Topic(),
+                       pc.options.subscription,
+                       msgID.String(),
+                       lastChunkedMsgID,
+                       chunkID,
+                       totalChunks,
+                       msgMeta.GetSequenceId(),
+                       msgMeta.GetUuid()),
+               )
+               if ctx != nil {
+                       ctx.chunkedMsgBuffer.Clear()
+                       pc.chunkedMsgCtxMap.remove(uuid)
+               }
+               pc.AckID(toTrackingMessageID(msgID))

Review Comment:
   Missing flow control update: When an unexpected chunk is discarded and 
acknowledged (line 1584), the available permits are not incremented. The old 
implementation had pc.availablePermits.inc() after removing the context. 
Without this increment, the consumer may stop receiving new messages when the 
available permits reach zero. Add pc.availablePermits.inc() after line 1584 to 
maintain proper flow control.
   ```suggestion
                pc.AckID(toTrackingMessageID(msgID))
                pc.availablePermits.inc()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to