gaoran10 commented on a change in pull request #12720: URL: https://github.com/apache/pulsar/pull/12720#discussion_r749986299
########## File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java ########## @@ -706,4 +730,94 @@ private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig) } } + private RawMessage processChunkedMessages(RawMessage message) { + final String uuid = message.getUUID(); + final int chunkId = message.getChunkId(); + final int totalChunkMsgSize = message.getTotalChunkMsgSize(); + final int numChunks = message.getNumChunksFromMsg(); + + RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId(); + if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId() + && !chunkedMessagesMap.containsKey(uuid)) { + // If the message is out of the split range, we only care about the incomplete chunked messages. + message.release(); + return null; + } + if (chunkId == 0) { + ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize); + chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer)); + } + + ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid); + if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null + || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) { + // Means we lost the first chunk, it will happens when the beginning chunk didn't belong to this split. Review comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org