reswqa commented on code in PR #25738:
URL: https://github.com/apache/flink/pull/25738#discussion_r1870538342
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java:
##########
@@ -48,7 +51,7 @@ class PartitionedFileReader {
private final PartitionedFile partitionedFile;
/** Target subpartition to read. */
Review Comment:
```suggestion
/** Target subpartitions to read. */
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java:
##########
@@ -396,15 +412,26 @@ ByteBuf write(ByteBufAllocator allocator) throws
IOException {
private ByteBuf fillHeader(ByteBufAllocator allocator) {
// only allocate header buffer - we will combine it with the data
buffer below
ByteBuf headerBuf =
- allocateBuffer(allocator, ID, MESSAGE_HEADER_LENGTH,
bufferSize, false);
+ allocateBuffer(
+ allocator,
+ ID,
+ MESSAGE_HEADER_LENGTH + Integer.BYTES *
numOfPartialBuffers,
+ bufferSize,
+ false);
receiverId.writeTo(headerBuf);
headerBuf.writeInt(subpartitionId);
+ headerBuf.writeInt(numOfPartialBuffers);
headerBuf.writeInt(sequenceNumber);
headerBuf.writeInt(backlog);
headerBuf.writeByte(dataType.ordinal());
headerBuf.writeBoolean(isCompressed);
headerBuf.writeInt(buffer.readableBytes());
+ for (int i = 0; i < numOfPartialBuffers; i++) {
+ int bytes = ((FullyFilledBuffer)
buffer).getPartialBuffers().get(i).readableBytes();
Review Comment:
Could we add a check state here and log some info.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java:
##########
@@ -65,9 +68,11 @@ class PartitionedFileReader {
/** Number of remaining bytes in the current data region read. */
private long currentRegionRemainingBytes;
+ private Queue<Tuple2<Long, Long>> offsetAndSizes = new ArrayDeque<>();
Review Comment:
java doc
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java:
##########
@@ -358,17 +364,75 @@ private void decodeBufferOrEvent(
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
} else if (bufferOrEvent.getBuffer() != null) {
- inputChannel.onBuffer(
- bufferOrEvent.getBuffer(),
- bufferOrEvent.sequenceNumber,
- bufferOrEvent.backlog,
- bufferOrEvent.subpartitionId);
+ if (bufferOrEvent.numOfPartialBuffers > 0) {
+ int offset = 0;
+
+ int seq = bufferOrEvent.sequenceNumber;
+ AtomicInteger waitToBeReleased =
+ new AtomicInteger(bufferOrEvent.numOfPartialBuffers);
+ AtomicInteger processedPartialBuffers = new AtomicInteger(0);
+ try {
+ for (int i = 0; i < bufferOrEvent.numOfPartialBuffers;
i++) {
+ int size =
bufferOrEvent.getPartialBufferSizes().get(i);
+
+ processedPartialBuffers.incrementAndGet();
+ inputChannel.onBuffer(
+ sliceBuffer(
+ bufferOrEvent,
+ memorySegment -> {
+ if
(waitToBeReleased.decrementAndGet() == 0) {
+
bufferOrEvent.getBuffer().recycleBuffer();
+ }
+ },
+ offset,
+ size),
+ seq++,
+ i == bufferOrEvent.numOfPartialBuffers - 1
+ ? bufferOrEvent.backlog
+ : -1,
+ bufferOrEvent.subpartitionId);
+ offset += size;
+ }
+ } catch (Throwable throwable) {
+ LOG.error("Failed to process partial buffers.", throwable);
+ if (processedPartialBuffers.get() !=
bufferOrEvent.numOfPartialBuffers) {
+ bufferOrEvent.getBuffer().recycleBuffer();
+ }
+ throw throwable;
+ }
+ } else {
+ inputChannel.onBuffer(
+ bufferOrEvent.getBuffer(),
+ bufferOrEvent.sequenceNumber,
+ bufferOrEvent.backlog,
+ bufferOrEvent.subpartitionId);
+ }
+
} else {
throw new IllegalStateException(
"The read buffer is null in credit-based input channel.");
}
}
+ private static NetworkBuffer sliceBuffer(
+ NettyMessage.BufferResponse bufferOrEvent,
+ BufferRecycler recycler,
+ int offset,
+ int size) {
+ ByteBuffer nioBuffer = bufferOrEvent.getBuffer().getNioBuffer(offset,
size);
Review Comment:
I wonder do we have to deal with MemorySegement? Could we slice
`bufferOrEvent.getBuffer()` via `Buffer.readonlySlice()` and `retainBuffer`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java:
##########
@@ -77,22 +82,81 @@ class PartitionedFileReader {
checkNotNull(indexFileChannel).isOpen(), "Index file channel
must be opened.");
this.partitionedFile = checkNotNull(partitionedFile);
- this.targetSubpartition = targetSubpartition;
+ this.subpartitionIndexSet = subpartitionIndexSet;
this.dataFileChannel = dataFileChannel;
this.indexFileChannel = indexFileChannel;
this.headerBuf = headerBuffer;
this.indexEntryBuf = indexEntryBuffer;
}
private void moveToNextReadableRegion(ByteBuffer indexEntryBuf) throws
IOException {
- while (currentRegionRemainingBytes <= 0
- && nextRegionToRead < partitionedFile.getNumRegions()) {
+ while (currentRegionRemainingBytes <= 0 && hasNextRegionToRead()) {
+ if (!offsetAndSizes.isEmpty()) {
+ Tuple2<Long, Long> offsetAndSize = offsetAndSizes.poll();
+ nextOffsetToRead = offsetAndSize.f0;
+ currentRegionRemainingBytes = offsetAndSize.f1;
+ } else {
+ // move to next region which has buffers
+ if (nextRegionToRead < partitionedFile.getNumRegions()) {
+ offsetAndSizes = computeReadOffsetAndSize(indexEntryBuf);
+ ++nextRegionToRead;
+ }
+ }
+ }
+ }
+
+ private boolean hasNextRegionToRead() {
+ return !offsetAndSizes.isEmpty() || nextRegionToRead <
partitionedFile.getNumRegions();
+ }
+
+ private Queue<Tuple2<Long, Long>> computeReadOffsetAndSize(ByteBuffer
indexEntryBuf)
Review Comment:
java doc to explain the algorithm here
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FullyFilledBuffer.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link Buffer} represents a fully filled buffer which
contains multiple
+ * partial buffers for network data communication.
+ *
+ * <p>All sub-buffers must have the same data type and compression status.
+ */
+public class FullyFilledBuffer implements Buffer {
Review Comment:
Can we use it directly or make some minor changes to CompositeBuffer? There
is a lot of duplicated code.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java:
##########
@@ -77,22 +82,81 @@ class PartitionedFileReader {
checkNotNull(indexFileChannel).isOpen(), "Index file channel
must be opened.");
this.partitionedFile = checkNotNull(partitionedFile);
- this.targetSubpartition = targetSubpartition;
+ this.subpartitionIndexSet = subpartitionIndexSet;
this.dataFileChannel = dataFileChannel;
this.indexFileChannel = indexFileChannel;
this.headerBuf = headerBuffer;
this.indexEntryBuf = indexEntryBuffer;
}
private void moveToNextReadableRegion(ByteBuffer indexEntryBuf) throws
IOException {
Review Comment:
```suggestion
private void moveToNextReadablePosition(ByteBuffer indexEntryBuf) throws
IOException {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java:
##########
@@ -536,8 +537,15 @@ public void close() {
protected ResultSubpartitionView createSubpartitionView(
int subpartitionIndex, BufferAvailabilityListener
availabilityListener)
throws IOException {
+ throw new UnsupportedEncodingException();
Review Comment:
Why does this constructor still need? Or you could just forward it to
`ResultSubpartitionIndexSet (subpartitionIndex)` assume start and end index are
inclusive.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java:
##########
@@ -358,17 +364,75 @@ private void decodeBufferOrEvent(
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
} else if (bufferOrEvent.getBuffer() != null) {
- inputChannel.onBuffer(
- bufferOrEvent.getBuffer(),
- bufferOrEvent.sequenceNumber,
- bufferOrEvent.backlog,
- bufferOrEvent.subpartitionId);
+ if (bufferOrEvent.numOfPartialBuffers > 0) {
+ int offset = 0;
+
+ int seq = bufferOrEvent.sequenceNumber;
+ AtomicInteger waitToBeReleased =
+ new AtomicInteger(bufferOrEvent.numOfPartialBuffers);
+ AtomicInteger processedPartialBuffers = new AtomicInteger(0);
+ try {
+ for (int i = 0; i < bufferOrEvent.numOfPartialBuffers;
i++) {
+ int size =
bufferOrEvent.getPartialBufferSizes().get(i);
+
+ processedPartialBuffers.incrementAndGet();
+ inputChannel.onBuffer(
+ sliceBuffer(
+ bufferOrEvent,
+ memorySegment -> {
+ if
(waitToBeReleased.decrementAndGet() == 0) {
+
bufferOrEvent.getBuffer().recycleBuffer();
+ }
+ },
+ offset,
+ size),
+ seq++,
+ i == bufferOrEvent.numOfPartialBuffers - 1
+ ? bufferOrEvent.backlog
+ : -1,
+ bufferOrEvent.subpartitionId);
Review Comment:
IIUC, subpartitionId doesn't make sense here, could we just pass -1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]