gaoran10 commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749986299



##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void 
initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) 
message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about 
the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = 
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> 
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || 
chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the 
beginning chunk didn't belong to this split.

Review comment:
       Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to