albertobastos commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2063163586
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -229,4 +254,64 @@ public DataBlock visit(ErrorMseBlock block,
List<DataBuffer> serializedStats) {
}
}
}
+
+ @VisibleForTesting
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxByteStringSize)
Review Comment:
Yeah that's enough, forgot to change it after moving them from the Utils
class.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -229,4 +254,64 @@ public DataBlock visit(ErrorMseBlock block,
List<DataBuffer> serializedStats) {
}
}
}
+
+ @VisibleForTesting
+ public static List<ByteString> toByteStrings(DataBlock dataBlock, int
maxByteStringSize)
+ throws IOException {
+ return toByteStrings(dataBlock.serialize(), maxByteStringSize);
+ }
+
+ @VisibleForTesting
+ public static List<ByteString> toByteStrings(List<ByteBuffer> bytes, int
maxByteStringSize) {
+ if (bytes.isEmpty()) {
+ return EMPTY_BYTEBUFFER_LIST;
+ }
+
+ int totalBytes = 0;
+ for (ByteBuffer bb : bytes) {
+ totalBytes += bb.remaining();
+ }
+ int initialCapacity = (totalBytes / maxByteStringSize) + bytes.size();
+ List<ByteString> result = new ArrayList<>(initialCapacity);
+
+ ByteString acc = ByteString.EMPTY;
+ int available = maxByteStringSize;
+
+ for (ByteBuffer bb: bytes) {
+ int from = bb.position();
+ int remaining = bb.limit() - from;
+ while (remaining > 0) {
+ if (remaining <= available) {
+ acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb,
from, from + remaining)));
+ available -= remaining;
+ remaining = 0;
+ } else {
+ acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb,
from, from + available)));
+ from += available;
+ remaining -= available;
+ result.add(acc);
+ acc = ByteString.EMPTY;
+ available = maxByteStringSize;
+ }
+ }
+ }
+ result.add(acc);
+
+ return result;
+ }
+
+ // polyfill because ByteBuffer.slice(pos, lim) is not available until Java 13
+ private static ByteBuffer sliceByteBuffer(ByteBuffer bb, int position, int
limit) {
+ int oldPosition = bb.position();
+ int oldLimit = bb.limit();
+
+ try {
+ bb.position(position);
+ bb.limit(limit);
+ return bb.slice();
+ } finally {
+ bb.position(oldPosition);
+ bb.limit(oldLimit);
+ }
Review Comment:
As we talked offline, it depends on what we want exactly to achieve during
the splitting process.
If we want to optimize the amount of output chunks (filling chunks of
`maxByteStringSize` until the very last chunk) it doesn't get much better than
this. If we only want to bother splitting when a `ByteBuffer` exceeds the
`maxByteStringSize` and we're ok with just pushing as-is the ones smaller than
that, we could get rid of the slicing (although we will still need to update
the buffer internal limit and position during the splitting).
I'm not convinced yet about changing it. Talking about performance, wouldn't
say that updating limit and position cursors are too expensive (internally is
just some simple assignations and comparisons). Talking about readibility... I
believe current code is quite readable, to be honest 😅
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]