geniusjoe commented on code in PR #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464#discussion_r2746603886


##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                partitionIdx: pc.partitionIdx,
        }
 
-       if msgMeta.GetChunkId() == 0 {
+       if chunkID == 0 {
+               // Handle ack hole case when receive duplicated chunks.
+               // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+               // Situation 1 - Message redeliver:
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+               // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+               // Situation 2 - Corrupted chunk message
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+               // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+               // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+               if pc.chunkedMsgCtxMap.get(uuid) != nil {
+                       ctx := pc.chunkedMsgCtxMap.get(uuid)
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               ctx.discard(pc)
+                       }
+                       // The first chunk of a new chunked-message received
+                       // before receiving other chunks of previous 
chunked-message
+                       // so, remove previous chunked-message from map and 
release buffer
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk id=0 
message with messageId [%s], sequenceId [%d], "+
+                                       "uuid [%s]. Remove previous chunk 
context with lastChunkedMsgID [%d]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                               ctx.lastChunkedMsgID,
+                       ))
+                       ctx.chunkedMsgBuffer.Clear()
+                       pc.chunkedMsgCtxMap.remove(uuid)
+               }
                pc.chunkedMsgCtxMap.addIfAbsent(uuid,
                        numChunks,
                        totalChunksSize,
                )
        }
 
+       // discard message if chunk is out-of-order
        ctx := pc.chunkedMsgCtxMap.get(uuid)
-
        if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != 
ctx.lastChunkedMsgID+1 {
+               // Filter and ack duplicated chunks instead of discard ctx.
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+               // We should filter and ack chunk-4 and chunk-5.
+               if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+                       pc.log.Warnf(fmt.Sprintf(
+                               "[%s] [%s] Receive a duplicated chunk message 
with messageId [%s], "+
+                                       "last-chunk-Id [%d], chunkId [%d], 
sequenceId [%d], uuid [%s]",
+                               pc.name,
+                               pc.options.subscription,
+                               msgID.String(),
+                               ctx.lastChunkedMsgID,
+                               chunkID,
+                               msgMeta.GetSequenceId(),
+                               msgMeta.GetUuid(),
+                       ))
+                       // Just like the above logic of receiving the first 
chunk again.
+                       // We only ack this chunk in the message duplication 
case.
+                       isCorruptedChunkMessageDetected := true
+                       for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+                               if previousChunkMsgID == nil {
+                                       continue
+                               }
+                               if previousChunkMsgID.equal(msgID) {
+                                       isCorruptedChunkMessageDetected = false
+                                       break
+                               }
+                       }
+                       if isCorruptedChunkMessageDetected {
+                               pc.AckID(toTrackingMessageID(msgID))
+                       }
+                       return nil
+               }
+               // Chunked messages rely on TCP to ensure that chunk IDs are 
strictly increasing within a partition.
+               // If the current chunk ID is greater than ctx.lastChunkedMsgID 
+ 1,
+               // it indicates that the current chunk is corrupted and 
requires resource cleanup.
                lastChunkedMsgID := -1
                totalChunks := -1
                if ctx != nil {
                        lastChunkedMsgID = int(ctx.lastChunkedMsgID)
                        totalChunks = int(ctx.totalChunks)
-                       ctx.chunkedMsgBuffer.Clear()
                }
                pc.log.Warnf(fmt.Sprintf(
-                       "Received unexpected chunk messageId %s, last-chunk-id 
%d, chunkId = %d, total-chunks %d",
-                       msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
-               pc.chunkedMsgCtxMap.remove(uuid)
-               pc.availablePermits.inc()
+                       "[%s] [%s] Received unexpected chunk messageId [%s], 
last-chunk-id [%d], "+
+                               "chunkId = [%d], total-chunks [%d], sequenceId 
[%d], uuid [%s]",
+                       pc.Topic(),
+                       pc.options.subscription,
+                       msgID.String(),
+                       lastChunkedMsgID,
+                       chunkID,
+                       totalChunks,
+                       msgMeta.GetSequenceId(),
+                       msgMeta.GetUuid()),
+               )
+               if ctx != nil {
+                       ctx.chunkedMsgBuffer.Clear()
+                       pc.chunkedMsgCtxMap.remove(uuid)
+               }
+               pc.AckID(toTrackingMessageID(msgID))

Review Comment:
   Fixed. Referring to the Java implementation, move 
`pc.availablePermits.inc()` to the beginning of the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to