Copilot commented on code in PR #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464#discussion_r2746104916
##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
+ if chunkID == 0 {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ if pc.chunkedMsgCtxMap.get(uuid) != nil {
+ ctx := pc.chunkedMsgCtxMap.get(uuid)
Review Comment:
Race condition: The context is retrieved twice using get() without holding a
lock between calls. Between lines 1481 and 1482, the context could be modified
or removed by another goroutine. Consider storing the result of get() in a
single variable to avoid potential nil pointer dereference or inconsistent
state.
```suggestion
ctx := pc.chunkedMsgCtxMap.get(uuid)
if ctx != nil {
```
##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
+ if chunkID == 0 {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ if pc.chunkedMsgCtxMap.get(uuid) != nil {
+ ctx := pc.chunkedMsgCtxMap.get(uuid)
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ ctx.discard(pc)
+ }
+ // The first chunk of a new chunked-message received
+ // before receiving other chunks of previous
chunked-message
+ // so, remove previous chunked-message from map and
release buffer
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk id=0
message with messageId [%s], sequenceId [%d], "+
+ "uuid [%s]. Remove previous chunk
context with lastChunkedMsgID [%d]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ctx.lastChunkedMsgID,
+ ))
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
numChunks,
totalChunksSize,
)
}
+ // discard message if chunk is out-of-order
ctx := pc.chunkedMsgCtxMap.get(uuid)
-
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID !=
ctx.lastChunkedMsgID+1 {
+ // Filter and ack duplicated chunks instead of discard ctx.
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+ // We should filter and ack chunk-4 and chunk-5.
+ if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk message
with messageId [%s], "+
+ "last-chunk-Id [%d], chunkId [%d],
sequenceId [%d], uuid [%s]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ ctx.lastChunkedMsgID,
+ chunkID,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ))
+ // Just like the above logic of receiving the first
chunk again.
+ // We only ack this chunk in the message duplication
case.
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ pc.AckID(toTrackingMessageID(msgID))
+ }
Review Comment:
Missing flow control update: When a duplicated chunk is filtered and
returned early (line 1557), the available permits are not incremented. This
could lead to flow control issues as the broker sent a message that was
consumed but not counted. Consider adding pc.availablePermits.inc() before the
return statement to maintain proper flow control, similar to how it's done at
line 1593 for non-final chunks.
```suggestion
}
pc.availablePermits.Inc()
```
##########
pulsar/message_chunking_test.go:
##########
@@ -578,3 +581,167 @@ func sendSingleChunk(p Producer, uuid string, chunkID
int, totalChunks int) {
uint32(internal.MaxMessageSize),
)
}
+
+func TestChunkWithReconnection(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ MaxPendingMessages: 200000,
+ SendTimeout: 60 * time.Second,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ // Reduce publish rate to prevent the producer sending messages too fast
+ url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic +
"/publishRate"
+ makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\":
1,\"publishThrottlingRateInByte\": 100}")
+ // Need to wait some time to let the rate limiter take effect
+ time.Sleep(2 * time.Second)
+
+ // payload/ChunkMaxMessageSize = 1000/100 = 10 msg, and
publishThrottlingRateInMsg = 1
+ // so that this chunk msg will send finish after 10 seconds
+ producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: createTestMessagePayload(1000),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ assert.Nil(t, err)
+ })
+ assert.NoError(t, err)
+
+ time.Sleep(5 * time.Second)
+ // trigger topic unload to test sending chunk msg with reconnection
+ url = adminURL + "/" + "admin/v2/persistent/public/default/" + topic +
"/unload"
+ makeHTTPCall(t, http.MethodPut, url, "")
+ // Need to wait some time to receive all chunk messages
+ time.Sleep(10 * time.Second)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.NotNil(t, msg.ID())
+}
+
+func TestResendChunkMessages(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ MaxPendingChunkedMessage: 10,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ sendSingleChunk(producer, "0", 0, 2)
+ sendSingleChunk(producer, "0", 0, 2) // Resending the first chunk
+ sendSingleChunk(producer, "1", 0, 3) // This is for testing the
interwoven chunked message
+ sendSingleChunk(producer, "1", 1, 3)
+ sendSingleChunk(producer, "1", 0, 3) // Resending the UUID-1 chunked
message
+ sendSingleChunk(producer, "0", 1, 2)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-0-0|chunk-0-1|", string(msg.Payload()))
+ c.Ack(msg)
+
+ sendSingleChunk(producer, "1", 1, 3)
+ sendSingleChunk(producer, "1", 2, 3)
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-1-0|chunk-1-1|chunk-1-2|", string(msg.Payload()))
+ c.Ack(msg)
+}
+
+func TestResendChunkWithAckHoleMessages(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ MaxPendingChunkedMessage: 10,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ sendSingleChunk(producer, "0", 0, 4)
+ sendSingleChunk(producer, "0", 1, 4)
+ sendSingleChunk(producer, "0", 2, 4)
+ sendSingleChunk(producer, "0", 1, 4) // Resending previous chunk
+ sendSingleChunk(producer, "0", 2, 4)
+ sendSingleChunk(producer, "0", 3, 4)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-0-0|chunk-0-1|chunk-0-2|chunk-0-3|",
string(msg.Payload()))
+ c.Ack(msg)
+
+ sendSingleChunk(producer, "1", 0, 4)
+ sendSingleChunk(producer, "1", 1, 4)
+ sendSingleChunk(producer, "1", 4, 4) // send broken chunk
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = c.Receive(ctx)
+ cancel()
+ assert.Error(t, context.DeadlineExceeded, err)
Review Comment:
Incorrect test assertion: assert.Error expects two arguments (the error
value and a message string), but here it's being called with (expected error,
actual error). This should be assert.ErrorIs(err, context.DeadlineExceeded) to
check if the error is the expected deadline exceeded error, or simply
assert.Error(t, err) if just checking for any error is sufficient.
```suggestion
assert.ErrorIs(t, err, context.DeadlineExceeded)
```
##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
+ if chunkID == 0 {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ if pc.chunkedMsgCtxMap.get(uuid) != nil {
+ ctx := pc.chunkedMsgCtxMap.get(uuid)
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ ctx.discard(pc)
+ }
+ // The first chunk of a new chunked-message received
+ // before receiving other chunks of previous
chunked-message
+ // so, remove previous chunked-message from map and
release buffer
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk id=0
message with messageId [%s], sequenceId [%d], "+
+ "uuid [%s]. Remove previous chunk
context with lastChunkedMsgID [%d]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ctx.lastChunkedMsgID,
+ ))
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
numChunks,
totalChunksSize,
)
}
+ // discard message if chunk is out-of-order
ctx := pc.chunkedMsgCtxMap.get(uuid)
-
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID !=
ctx.lastChunkedMsgID+1 {
+ // Filter and ack duplicated chunks instead of discard ctx.
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+ // We should filter and ack chunk-4 and chunk-5.
+ if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk message
with messageId [%s], "+
+ "last-chunk-Id [%d], chunkId [%d],
sequenceId [%d], uuid [%s]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ ctx.lastChunkedMsgID,
+ chunkID,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ))
+ // Just like the above logic of receiving the first
chunk again.
+ // We only ack this chunk in the message duplication
case.
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ pc.AckID(toTrackingMessageID(msgID))
+ }
+ return nil
+ }
+ // Chunked messages rely on TCP to ensure that chunk IDs are
strictly increasing within a partition.
+ // If the current chunk ID is greater than ctx.lastChunkedMsgID
+ 1,
+ // it indicates that the current chunk is corrupted and
requires resource cleanup.
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
- ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
- "Received unexpected chunk messageId %s, last-chunk-id
%d, chunkId = %d, total-chunks %d",
- msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
- pc.chunkedMsgCtxMap.remove(uuid)
- pc.availablePermits.inc()
+ "[%s] [%s] Received unexpected chunk messageId [%s],
last-chunk-id [%d], "+
+ "chunkId = [%d], total-chunks [%d], sequenceId
[%d], uuid [%s]",
+ pc.Topic(),
+ pc.options.subscription,
+ msgID.String(),
+ lastChunkedMsgID,
+ chunkID,
+ totalChunks,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid()),
+ )
+ if ctx != nil {
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
+ pc.AckID(toTrackingMessageID(msgID))
Review Comment:
Missing flow control update: When an unexpected chunk is discarded and
acknowledged (line 1584), the available permits are not incremented. The old
implementation had pc.availablePermits.inc() after removing the context.
Without this increment, the consumer may stop receiving new messages when the
available permits reach zero. Add pc.availablePermits.inc() after line 1584 to
maintain proper flow control.
```suggestion
pc.AckID(toTrackingMessageID(msgID))
pc.availablePermits.inc()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]