geniusjoe commented on code in PR #587:
URL: https://github.com/apache/pulsar-client-cpp/pull/587#discussion_r3339459897
##########
lib/ConsumerImpl.cc:
##########
@@ -512,45 +563,104 @@ optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
}
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(),
metadata.total_chunk_msg_size()});
+ it->second.appendChunk(messageId, payload);
+ lock.unlock();
+ return {};
}
- auto& chunkedMsgCtx = it->second;
- if (it == chunkedMessageCache_.end() ||
!chunkedMsgCtx.validateChunkId(chunkId)) {
+ // Part 2: chunkId != 0 but chunk context not found in cache.
+ // This happens when the first chunk was not received (e.g., consumer used
seek() or started
+ // consuming from a specific message position that falls in the middle of
a chunked message,
+ // or the context was evicted due to maxPendingChunkedMessage limit).
+ // In this case, the chunk message cannot be assembled, so just discard it.
+ if (it == chunkedMessageCache_.end()) {
auto startMessageId = getStartMessageId();
if (!config_.isStartMessageIdInclusive() && startMessageId &&
startMessageId->ledgerId() == messageId.ledgerId() &&
startMessageId->entryId() == messageId.entryId()) {
- // When the start message id is not inclusive, the last chunk of
the previous chunked message will
- // be delivered, which is expected and we only need to filter it
out.
- chunkedMessageCache_.remove(uuid);
LOG_INFO("Filtered the chunked message before the start message id
(uuid: "
<< uuid << " chunkId: " << chunkId << ", messageId: " <<
messageId << ")");
- } else if (it == chunkedMessageCache_.end()) {
+ } else {
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << "
chunkId: " << chunkId
<< ", messageId: "
<< messageId << ")");
- } else {
- LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
- << uuid << " chunkId: " << chunkId << ", messageId: " <<
messageId << ")");
- chunkedMessageCache_.remove(uuid);
}
lock.unlock();
- increaseAvailablePermits(cnx);
- trackMessage(messageId);
return {};
}
+ // Part 3: chunkId does not match the expected next chunk ID
(out-of-order).
+ // Two sub-cases:
+ // a) chunkId <= lastChunkedMessageId: duplicated chunk caused by
redeliver or corruption.
+ // Filter and ack the duplicated chunk if it's corrupted, then
discard it.
+ // b) chunkId > lastChunkedMessageId + 1: gap detected, the chunked
message is corrupted.
+ // Remove the context and ack the current chunk if it has expired.
+ auto& chunkedMsgCtx = it->second;
+ if (!chunkedMsgCtx.validateChunkId(chunkId)) {
+ const int lastChunkedMessageId =
static_cast<int>(chunkedMsgCtx.getChunkedMessageIds().size()) - 1;
+ // For example (duplicated chunk):
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+ if (chunkId <= lastChunkedMessageId) {
+ bool isCorruptedChunk = true;
+ for (const MessageId& cachedMsgId :
chunkedMsgCtx.getChunkedMessageIds()) {
+ if (cachedMsgId.ledgerId() == messageId.ledgerId() &&
+ cachedMsgId.entryId() == messageId.entryId()) {
+ isCorruptedChunk = false;
+ break;
+ }
+ }
+ LOG_WARN("Received a duplicated chunk message (uuid: "
+ << uuid << " chunkId: " << chunkId << ",
lastChunkedMessageId: " << lastChunkedMessageId
+ << ", messageId: " << messageId << ")");
+ if (isCorruptedChunk) {
+ LOG_INFO("Acking corrupted duplicated chunk to avoid ack hole,
uuid: "
+ << uuid << ", messageId: " << messageId);
+ acknowledgeAsync(messageId, [uuid, messageId](Result result) {
+ if (result != ResultOk) {
+ LOG_WARN("Failed to acknowledge duplicated chunk,
uuid: "
+ << uuid << ", messageId: " << messageId);
+ }
+ });
+ }
+ lock.unlock();
+ return {};
+ }
+ // chunkId > lastChunkedMessageId + 1, the chunked message is
corrupted.
+ LOG_WARN("Received unexpected chunk (uuid: "
+ << uuid << " chunkId: " << chunkId << ",
lastChunkedMessageId: " << lastChunkedMessageId
+ << ", messageId: " << messageId << ")");
+ chunkedMessageCache_.remove(uuid);
+ lock.unlock();
Review Comment:
Fixed. Same permit leak issue — added the same guard here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]