rdhabalia commented on a change in pull request #12403:
URL: https://github.com/apache/pulsar/pull/12403#discussion_r759618530
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1206,6 +1206,24 @@ void messageReceived(MessageIdData messageId, int
redeliveryCount, List<Long> ac
if (uncompressedPayload == null) {
return;
}
+
+ // last chunk received: so, stitch chunked-messages and clear
up chunkedMsgBuffer
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed chunkId {},
total-chunks {}, msgId {} sequenceId {}",
+ msgMetadata.getChunkId(),
msgMetadata.getNumChunksFromMsg(), msgId,
+ msgMetadata.getSequenceId());
+ }
+
+ // remove buffer from the map, set the chunk message id
+ ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.remove(msgMetadata.getUuid());
+ if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
+ msgId = new
ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
+
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
Review comment:
can we remove all producer-side changes then because the producer can't
give a guarantee to set messageId(ledgerId,entryId) of the chunk.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]