Copilot commented on code in PR #587:
URL: https://github.com/apache/pulsar-client-cpp/pull/587#discussion_r3338289300


##########
lib/ConsumerImpl.cc:
##########
@@ -512,45 +563,104 @@ optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
         }
         it = chunkedMessageCache_.putIfAbsent(
             uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), 
metadata.total_chunk_msg_size()});
+        it->second.appendChunk(messageId, payload);
+        lock.unlock();
+        return {};
     }
 
-    auto& chunkedMsgCtx = it->second;
-    if (it == chunkedMessageCache_.end() || 
!chunkedMsgCtx.validateChunkId(chunkId)) {
+    // Part 2: chunkId != 0 but chunk context not found in cache.
+    // This happens when the first chunk was not received (e.g., consumer used 
seek() or started
+    // consuming from a specific message position that falls in the middle of 
a chunked message,
+    // or the context was evicted due to maxPendingChunkedMessage limit).
+    // In this case, the chunk message cannot be assembled, so just discard it.
+    if (it == chunkedMessageCache_.end()) {
         auto startMessageId = getStartMessageId();
         if (!config_.isStartMessageIdInclusive() && startMessageId &&
             startMessageId->ledgerId() == messageId.ledgerId() &&
             startMessageId->entryId() == messageId.entryId()) {
-            // When the start message id is not inclusive, the last chunk of 
the previous chunked message will
-            // be delivered, which is expected and we only need to filter it 
out.
-            chunkedMessageCache_.remove(uuid);
             LOG_INFO("Filtered the chunked message before the start message id 
(uuid: "
                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-        } else if (it == chunkedMessageCache_.end()) {
+        } else {
             LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " 
chunkId: " << chunkId
                                                            << ", messageId: " 
<< messageId << ")");
-        } else {
-            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
-                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-            chunkedMessageCache_.remove(uuid);
         }
         lock.unlock();
-        increaseAvailablePermits(cnx);
-        trackMessage(messageId);
         return {};

Review Comment:
   With permits now only being returned up-front for non-last chunks, 
discarding a *last* chunk due to a missing chunk context (seek/cache eviction) 
will leak a permit and can eventually stall consumption.



##########
lib/ConsumerImpl.cc:
##########
@@ -512,45 +563,104 @@ optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
         }
         it = chunkedMessageCache_.putIfAbsent(
             uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), 
metadata.total_chunk_msg_size()});
+        it->second.appendChunk(messageId, payload);
+        lock.unlock();
+        return {};
     }
 
-    auto& chunkedMsgCtx = it->second;
-    if (it == chunkedMessageCache_.end() || 
!chunkedMsgCtx.validateChunkId(chunkId)) {
+    // Part 2: chunkId != 0 but chunk context not found in cache.
+    // This happens when the first chunk was not received (e.g., consumer used 
seek() or started
+    // consuming from a specific message position that falls in the middle of 
a chunked message,
+    // or the context was evicted due to maxPendingChunkedMessage limit).
+    // In this case, the chunk message cannot be assembled, so just discard it.
+    if (it == chunkedMessageCache_.end()) {
         auto startMessageId = getStartMessageId();
         if (!config_.isStartMessageIdInclusive() && startMessageId &&
             startMessageId->ledgerId() == messageId.ledgerId() &&
             startMessageId->entryId() == messageId.entryId()) {
-            // When the start message id is not inclusive, the last chunk of 
the previous chunked message will
-            // be delivered, which is expected and we only need to filter it 
out.
-            chunkedMessageCache_.remove(uuid);
             LOG_INFO("Filtered the chunked message before the start message id 
(uuid: "
                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-        } else if (it == chunkedMessageCache_.end()) {
+        } else {
             LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " 
chunkId: " << chunkId
                                                            << ", messageId: " 
<< messageId << ")");
-        } else {
-            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
-                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-            chunkedMessageCache_.remove(uuid);
         }
         lock.unlock();
-        increaseAvailablePermits(cnx);
-        trackMessage(messageId);
         return {};
     }
 
+    // Part 3: chunkId does not match the expected next chunk ID 
(out-of-order).
+    // Two sub-cases:
+    //   a) chunkId <= lastChunkedMessageId: duplicated chunk caused by 
redeliver or corruption.
+    //      Filter and ack the duplicated chunk if it's corrupted, then 
discard it.
+    //   b) chunkId > lastChunkedMessageId + 1: gap detected, the chunked 
message is corrupted.
+    //      Remove the context and ack the current chunk if it has expired.
+    auto& chunkedMsgCtx = it->second;
+    if (!chunkedMsgCtx.validateChunkId(chunkId)) {
+        const int lastChunkedMessageId = 
static_cast<int>(chunkedMsgCtx.getChunkedMessageIds().size()) - 1;
+        // For example (duplicated chunk):
+        //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+        //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+        //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+        //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+        //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+        //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+        if (chunkId <= lastChunkedMessageId) {
+            bool isCorruptedChunk = true;
+            for (const MessageId& cachedMsgId : 
chunkedMsgCtx.getChunkedMessageIds()) {
+                if (cachedMsgId.ledgerId() == messageId.ledgerId() &&
+                    cachedMsgId.entryId() == messageId.entryId()) {
+                    isCorruptedChunk = false;
+                    break;
+                }
+            }
+            LOG_WARN("Received a duplicated chunk message (uuid: "
+                     << uuid << " chunkId: " << chunkId << ", 
lastChunkedMessageId: " << lastChunkedMessageId
+                     << ", messageId: " << messageId << ")");
+            if (isCorruptedChunk) {
+                LOG_INFO("Acking corrupted duplicated chunk to avoid ack hole, 
uuid: "
+                         << uuid << ", messageId: " << messageId);
+                acknowledgeAsync(messageId, [uuid, messageId](Result result) {
+                    if (result != ResultOk) {
+                        LOG_WARN("Failed to acknowledge duplicated chunk, 
uuid: "
+                                 << uuid << ", messageId: " << messageId);
+                    }
+                });
+            }
+            lock.unlock();
+            return {};
+        }
+        // chunkId > lastChunkedMessageId + 1, the chunked message is 
corrupted.
+        LOG_WARN("Received unexpected chunk (uuid: "
+                 << uuid << " chunkId: " << chunkId << ", 
lastChunkedMessageId: " << lastChunkedMessageId
+                 << ", messageId: " << messageId << ")");
+        chunkedMessageCache_.remove(uuid);
+        lock.unlock();

Review Comment:
   In the gap/out-of-order path, a *last* chunk can be discarded without 
returning its permit (permits are currently only returned early for non-last 
chunks). This can leak permits and stop further message delivery.



##########
tests/MessageChunkingTest.cc:
##########
@@ -96,6 +96,18 @@ class MessageChunkingTest : public 
::testing::TestWithParam<CompressionType> {
 
 std::string MessageChunkingTest::largeMessage = createLargeMessage();
 
+// Helper function: send a single chunk message
+static void sendSingleChunk(Producer& producer, const std::string& uuid, int 
chunkId, int totalChunks) {
+    std::string content = "chunk-" + uuid + "-" + std::to_string(chunkId) + 
"|";
+    auto msg = MessageBuilder().setContent(content).build();
+    auto& metadata = PulsarFriend::getMessageMetadata(msg);
+    metadata.set_num_chunks_from_msg(totalChunks);
+    metadata.set_chunk_id(chunkId);
+    metadata.set_uuid(uuid);
+    metadata.set_total_chunk_msg_size(100);
+    producer.send(msg);

Review Comment:
   `sendSingleChunk` calls the deprecated `Producer::send(const Message&)` 
overload and ignores the returned `Result`, which makes test failures less 
actionable (a send failure can later look like a receive timeout).



##########
lib/ConsumerImpl.cc:
##########
@@ -488,19 +488,70 @@ optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
     LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << 
uuid
                                                  << ", messageId: " << 
messageId << ") of "
                                                  << payload.readableBytes() << 
" bytes");
-
     Lock lock(chunkProcessMutex_);
-
+    // For non-last chunks, increase available permits immediately since they 
don't occupy receiver queue.
+    if (chunkId != metadata.num_chunks_from_msg() - 1) {
+        increaseAvailablePermits(cnx);
+    }
     // Lazy task scheduling to expire incomplete chunk message
     bool expected = false;
     if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
         expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, 
true)) {
         triggerCheckExpiredChunkedTimer();
     }
 
+    // Part 1: chunkId == 0, this is the first chunk of a chunked message.
+    // If a previous incomplete context with the same uuid exists, it means 
either:
+    //   a) Message redeliver: the first chunk's messageId matches one of the 
cached chunk messageIds.
+    //      In this case, the old context is simply removed and a new context 
is created to restart
+    //      assembling from scratch. No ack is needed since the old chunks 
will be redelivered.
+    //   b) Corrupted chunk message: the first chunk's messageId does NOT 
match any cached chunk messageId,
+    //      meaning a new producer sent a message with the same uuid. In this 
case, ack the old cached
+    //      chunks to avoid ack holes, then remove the old context and create 
a new one.
+    // After handling the old context, check maxPendingChunkedMessage limit 
and create a new context.
     auto it = chunkedMessageCache_.find(uuid);
-
-    if (chunkId == 0 && it == chunkedMessageCache_.end()) {
+    if (chunkId == 0) {
+        // Handle ack hole when receiving duplicated first chunk.
+        // For example (message redeliver):
+        //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+        //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+        //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+        //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+        //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+        // For example (corrupted chunk message):
+        //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+        //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+        //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+        //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+        //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+        if (it != chunkedMessageCache_.end()) {
+            auto& existingCtx = it->second;
+            bool isCorruptedChunkMessage = true;
+            for (const MessageId& cachedMsgId : 
existingCtx.getChunkedMessageIds()) {
+                if (cachedMsgId.ledgerId() == messageId.ledgerId() &&
+                    cachedMsgId.entryId() == messageId.entryId()) {
+                    isCorruptedChunkMessage = false;
+                    break;

Review Comment:
   Corruption vs redelivery detection should use full `MessageId` equality 
(`operator==` includes ledgerId/entryId/batchIndex/partition). Comparing only 
ledgerId/entryId can misclassify IDs and lead to incorrect acking/cleanup 
decisions.



##########
lib/ConsumerImpl.cc:
##########
@@ -512,45 +563,104 @@ optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
         }
         it = chunkedMessageCache_.putIfAbsent(
             uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), 
metadata.total_chunk_msg_size()});
+        it->second.appendChunk(messageId, payload);
+        lock.unlock();
+        return {};
     }
 
-    auto& chunkedMsgCtx = it->second;
-    if (it == chunkedMessageCache_.end() || 
!chunkedMsgCtx.validateChunkId(chunkId)) {
+    // Part 2: chunkId != 0 but chunk context not found in cache.
+    // This happens when the first chunk was not received (e.g., consumer used 
seek() or started
+    // consuming from a specific message position that falls in the middle of 
a chunked message,
+    // or the context was evicted due to maxPendingChunkedMessage limit).
+    // In this case, the chunk message cannot be assembled, so just discard it.
+    if (it == chunkedMessageCache_.end()) {
         auto startMessageId = getStartMessageId();
         if (!config_.isStartMessageIdInclusive() && startMessageId &&
             startMessageId->ledgerId() == messageId.ledgerId() &&
             startMessageId->entryId() == messageId.entryId()) {
-            // When the start message id is not inclusive, the last chunk of 
the previous chunked message will
-            // be delivered, which is expected and we only need to filter it 
out.
-            chunkedMessageCache_.remove(uuid);
             LOG_INFO("Filtered the chunked message before the start message id 
(uuid: "
                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-        } else if (it == chunkedMessageCache_.end()) {
+        } else {
             LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " 
chunkId: " << chunkId
                                                            << ", messageId: " 
<< messageId << ")");
-        } else {
-            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
-                      << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
-            chunkedMessageCache_.remove(uuid);
         }
         lock.unlock();
-        increaseAvailablePermits(cnx);
-        trackMessage(messageId);
         return {};
     }
 
+    // Part 3: chunkId does not match the expected next chunk ID 
(out-of-order).
+    // Two sub-cases:
+    //   a) chunkId <= lastChunkedMessageId: duplicated chunk caused by 
redeliver or corruption.
+    //      Filter and ack the duplicated chunk if it's corrupted, then 
discard it.
+    //   b) chunkId > lastChunkedMessageId + 1: gap detected, the chunked 
message is corrupted.
+    //      Remove the context and ack the current chunk if it has expired.
+    auto& chunkedMsgCtx = it->second;
+    if (!chunkedMsgCtx.validateChunkId(chunkId)) {
+        const int lastChunkedMessageId = 
static_cast<int>(chunkedMsgCtx.getChunkedMessageIds().size()) - 1;
+        // For example (duplicated chunk):
+        //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+        //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+        //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+        //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+        //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+        //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+        if (chunkId <= lastChunkedMessageId) {
+            bool isCorruptedChunk = true;
+            for (const MessageId& cachedMsgId : 
chunkedMsgCtx.getChunkedMessageIds()) {
+                if (cachedMsgId.ledgerId() == messageId.ledgerId() &&
+                    cachedMsgId.entryId() == messageId.entryId()) {
+                    isCorruptedChunk = false;
+                    break;

Review Comment:
   Same issue here: use `cachedMsgId == messageId` to decide whether a 
duplicated chunk is a redelivery. This prevents treating a legitimate 
redelivery as corruption if other MessageId fields differ.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to