albertobastos commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2055538832
##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
return byteString;
}
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxBlockSize)
+ throws IOException {
+ List<ByteBuffer> bytes = dataBlock.serialize();
+ if (bytes.isEmpty()) {
+ return List.of(ByteString.EMPTY);
Review Comment:
Just copy-pasted from the previous version where a `ByteString.EMPTY` was
returned. But now that we are dealing with list, I guess is ok to return just
an empty one.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent>
getContentObserver() {
.open(_statusObserver);
}
- private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer>
serializedStats)
+ private List<MailboxContent> toMailboxContents(MseBlock block,
List<DataBuffer> serializedStats)
Review Comment:
Let's go with the stream approach for once.
##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
return byteString;
}
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxBlockSize)
+ throws IOException {
+ List<ByteBuffer> bytes = dataBlock.serialize();
+ if (bytes.isEmpty()) {
+ return List.of(ByteString.EMPTY);
+ }
+
+ List<ByteString> byteStrings = new ArrayList<>();
+ ByteString current = UnsafeByteOperations.unsafeWrap(bytes.get(0));
+ for (int i = 1; i < bytes.size(); i++) {
+ ByteBuffer bb = bytes.get(i);
+ if (current.size() + bb.remaining() > maxBlockSize) {
+ byteStrings.add(current);
+ current = UnsafeByteOperations.unsafeWrap(bb);
+ } else {
+ current = current.concat(UnsafeByteOperations.unsafeWrap(bb));
+ }
+ }
Review Comment:
Yeah, we are assuming that the `ByteBuffer`s returned by block serialization
will never exceed the given maximum. If that happens, then that big buffer will
be converted to a `ByteString` as-is.
I thought of ways to optimize that but all of them require dealing with
array copies instead of relying on unsafe wrap operations. I believe that would
add an undesired overhead so for now I just documented the current behaviour.
##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
return byteString;
}
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxBlockSize)
+ throws IOException {
+ List<ByteBuffer> bytes = dataBlock.serialize();
+ if (bytes.isEmpty()) {
+ return List.of(ByteString.EMPTY);
+ }
+
+ List<ByteString> byteStrings = new ArrayList<>();
Review Comment:
Got it.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent>
getContentObserver() {
.open(_statusObserver);
}
- private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer>
serializedStats)
+ private List<MailboxContent> toMailboxContents(MseBlock block,
List<DataBuffer> serializedStats)
throws IOException {
_statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
long start = System.currentTimeMillis();
try {
DataBlock dataBlock = MseBlockSerializer.toDataBlock(block,
serializedStats);
- ByteString byteString = DataBlockUtils.toByteString(dataBlock);
- int sizeInBytes = byteString.size();
+ // so far we ensure payload is not bigger than maxBlockSize/2, we can
fine tune this later
+ List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock,
getMaxBlockSize() / 2);
+ int sizeInBytes = byteStrings.stream().map(ByteString::size).reduce(0,
Integer::sum);
Review Comment:
Done.
##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
return byteString;
}
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxBlockSize)
Review Comment:
Yeah, I see that. Renamed to `maxByteStringSize` and added some simple doc.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent>
getContentObserver() {
.open(_statusObserver);
}
- private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer>
serializedStats)
+ private List<MailboxContent> toMailboxContents(MseBlock block,
List<DataBuffer> serializedStats)
throws IOException {
_statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
long start = System.currentTimeMillis();
try {
DataBlock dataBlock = MseBlockSerializer.toDataBlock(block,
serializedStats);
- ByteString byteString = DataBlockUtils.toByteString(dataBlock);
- int sizeInBytes = byteString.size();
+ // so far we ensure payload is not bigger than maxBlockSize/2, we can
fine tune this later
+ List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock,
getMaxBlockSize() / 2);
Review Comment:
You're right. Moved to the mailbox initialization.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -58,10 +61,18 @@ public void onNext(MailboxContent mailboxContent) {
if (_mailbox == null) {
_mailbox = _mailboxService.getReceivingMailbox(mailboxId);
}
+ if (_mailboxBuffers == null) {
+ _mailboxBuffers = new ArrayList<>();
+ }
Review Comment:
Just avoid allocating the array until the observer is actually used, but I
guess we will rarely create an observer and not use it. Changed it to eager.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]