yashmayya commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2067868126


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -402,7 +403,14 @@ public List<ByteBuffer> serialize()
     if (_serialized == null) {
       _serialized = DataBlockUtils.serialize(this);
     }
-    return _serialized;
+    // Return a copy of the serialized data to avoid external modification.
+    List<ByteBuffer> copy = new ArrayList<>(_serialized.size());
+    for (ByteBuffer page: _serialized) {
+      ByteBuffer pageCopy = page.duplicate();
+      pageCopy.order(page.order());
+      copy.add(pageCopy);
+    }
+    return copy;

Review Comment:
   Was this required because we're modifying buffer positions / limits in the 
new logic in `GrpcSendingMailbox`? And the buffer contents themselves aren't 
being duplicated, but shared, correct?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -60,10 +63,15 @@ public void onNext(MailboxContent mailboxContent) {
     if (_mailbox == null) {
       _mailbox = _mailboxService.getReceivingMailbox(mailboxId);
     }
+    _mailboxBuffers.add(mailboxContent.getPayload().asReadOnlyByteBuffer());
+    if (mailboxContent.getWaitForMore()) {
+      return;

Review Comment:
   Might be worth adding a debug log here to help with edge case scenarios 
where only subsets of block splits are successfully transmitted for some reason.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -52,24 +58,32 @@
 public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
+  private static final List<ByteString> EMPTY_BYTEBUFFER_LIST = 
Collections.emptyList();
   private final String _id;
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _port;
   private final long _deadlineMs;
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
+  private final int _maxByteStringSize;
 
   private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,
+  public GrpcSendingMailbox(
+      PinotConfiguration config, String id, ChannelManager channelManager, 
String hostname, int port, long deadlineMs,
       StatMap<MailboxSendOperator.StatKey> statMap) {
     _id = id;
     _channelManager = channelManager;
     _hostname = hostname;
     _port = port;
     _deadlineMs = deadlineMs;
     _statMap = statMap;
+    // so far we ensure payload is not bigger than maxBlockSize/2, we can fine 
tune this later
+    _maxByteStringSize = Math.max(config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
+    ) / 2, 1);

Review Comment:
   I don't really follow the logic here?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java:
##########
@@ -87,7 +87,7 @@ public void testLocalHappyPathSendFirst()
     SendingMailbox sendingMailbox =
         _mailboxService1.getSendingMailbox("localhost", 
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE, _stats);
     for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
-      sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{i}));
+      sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));

Review Comment:
   Why were all these int -> string changes here needed?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -252,7 +252,7 @@ public static DataBlock readFrom(ByteBuffer buffer)
   public static DataBlock deserialize(List<ByteBuffer> buffers)
       throws IOException {
     List<DataBuffer> dataBuffers = buffers.stream()
-        .map(PinotByteBuffer::wrap)
+        .map(PinotByteBuffer::slice)

Review Comment:
   Why was this change required?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java:
##########
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockEquals;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.segment.spi.memory.CompoundDataBuffer;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class GrpcSendingMailboxTest {
+
+  @Test(dataProvider = "byteBuffersDataProvider")
+  public void testByteBuffersToByteStrings(int[] byteBufferSizes, int 
maxByteStringSize) {
+    List<ByteBuffer> input = Arrays.stream(byteBufferSizes)
+        .mapToObj(this::randomByteBuffer).collect(Collectors.toList());
+    ByteBuffer expected = concatenateBuffers(input);
+
+    List<ByteString> output = GrpcSendingMailbox.toByteStrings(input, 
maxByteStringSize);
+    for (ByteString chunk: output) {
+      assertTrue(chunk.size() <= maxByteStringSize);
+    }

Review Comment:
   Minor suggestion to strengthen the assertion:
   ```suggestion
       for (ByteString chunk: output.subList(0, output.size() - 1)) {
         assertEquals(chunk.size(), maxByteStringSize);
       }
       assertTrue(output.get(output.size() - 1).size() <= maxByteStringSize);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -229,4 +254,64 @@ public DataBlock visit(ErrorMseBlock block, 
List<DataBuffer> serializedStats) {
       }
     }
   }
+
+  @VisibleForTesting
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxByteStringSize)
+      throws IOException {
+    return toByteStrings(dataBlock.serialize(), maxByteStringSize);
+  }
+
+  @VisibleForTesting
+  public static List<ByteString> toByteStrings(List<ByteBuffer> bytes, int 
maxByteStringSize) {
+    if (bytes.isEmpty()) {
+      return EMPTY_BYTEBUFFER_LIST;
+    }
+
+    int totalBytes = 0;
+    for (ByteBuffer bb : bytes) {
+      totalBytes += bb.remaining();
+    }
+    int initialCapacity = (totalBytes / maxByteStringSize) + bytes.size();
+    List<ByteString> result = new ArrayList<>(initialCapacity);
+
+    ByteString acc = ByteString.EMPTY;
+    int available = maxByteStringSize;
+
+    for (ByteBuffer bb: bytes) {
+      int from = bb.position();
+      int remaining = bb.limit() - from;
+      while (remaining > 0) {
+        if (remaining <= available) {
+          acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb, 
from, from + remaining)));
+          available -= remaining;
+          remaining = 0;
+        } else {
+          acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb, 
from, from + available)));
+          from += available;
+          remaining -= available;
+          result.add(acc);
+          acc = ByteString.EMPTY;
+          available = maxByteStringSize;
+        }
+      }
+    }
+    result.add(acc);
+
+    return result;
+  }
+
+  // polyfill because ByteBuffer.slice(pos, lim) is not available until Java 13
+  private static ByteBuffer sliceByteBuffer(ByteBuffer bb, int position, int 
limit) {
+    int oldPosition = bb.position();
+    int oldLimit = bb.limit();
+
+    try {
+      bb.position(position);
+      bb.limit(limit);
+      return bb.slice();
+    } finally {
+      bb.position(oldPosition);
+      bb.limit(oldLimit);
+    }

Review Comment:
   Yeah agreed, moving position / limit seem to be fairly cheap operations 
relatively and we're most likely not gonna be doing that a huge number of times 
(since the max byte string size will almost always be a reasonable factor of 
the block size). And +1 on readability, the current code is quite clear in what 
it is trying to do. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to