geniusjoe commented on code in PR #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464#discussion_r2746602400
##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
+ if chunkID == 0 {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ if pc.chunkedMsgCtxMap.get(uuid) != nil {
+ ctx := pc.chunkedMsgCtxMap.get(uuid)
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ ctx.discard(pc)
+ }
+ // The first chunk of a new chunked-message received
+ // before receiving other chunks of previous
chunked-message
+ // so, remove previous chunked-message from map and
release buffer
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk id=0
message with messageId [%s], sequenceId [%d], "+
+ "uuid [%s]. Remove previous chunk
context with lastChunkedMsgID [%d]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ctx.lastChunkedMsgID,
+ ))
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
numChunks,
totalChunksSize,
)
}
+ // discard message if chunk is out-of-order
ctx := pc.chunkedMsgCtxMap.get(uuid)
-
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID !=
ctx.lastChunkedMsgID+1 {
+ // Filter and ack duplicated chunks instead of discard ctx.
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+ // We should filter and ack chunk-4 and chunk-5.
+ if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk message
with messageId [%s], "+
+ "last-chunk-Id [%d], chunkId [%d],
sequenceId [%d], uuid [%s]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ ctx.lastChunkedMsgID,
+ chunkID,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ))
+ // Just like the above logic of receiving the first
chunk again.
+ // We only ack this chunk in the message duplication
case.
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ pc.AckID(toTrackingMessageID(msgID))
+ }
Review Comment:
Fixed. Referring to the Java implementation, move
`pc.availablePermits.inc()` to the beginning of the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]