TanYuxin-tyx commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1431327842


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.Serializable;
+
+/** A collection of subpartition indexes. */
+public interface ResultSubpartitionIndexSet extends Iterable<Integer>, 
Serializable {

Review Comment:
   Do we need the interface extended from Iterable, Is it more flexible to 
define a method in the interface instead? 
   Note this is not a strong comment because this can also work, but I think 
it's worth discussing it carefully for better subsequential implementation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.Iterator;
+
+/**
+ * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The 
range is inclusive.
+ */
+public class ResultSubpartitionIndexRange extends IndexRange implements 
ResultSubpartitionIndexSet {

Review Comment:
   Is this extended because of compatibility or other considerations?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnionResultSubpartitionView}. */
+public class UnionResultSubpartitionViewTest {

Review Comment:
   Remove the `public` here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnionResultSubpartitionView}. */
+public class UnionResultSubpartitionViewTest {
+
+    private UnionResultSubpartitionView view;
+
+    private List<Buffer> buffers0;
+
+    private ResultSubpartitionView view0;
+
+    private List<Buffer> buffers1;
+
+    private ResultSubpartitionView view1;
+
+    @BeforeEach
+    void before() {
+        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+
+        buffers0 =
+                Arrays.asList(
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+        view0 = new TestingResultSubpartitionView(view, buffers0);
+        view.notifyViewCreated(0, view0);
+
+        buffers1 =
+                Arrays.asList(
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+        view1 = new TestingResultSubpartitionView(view, buffers1);
+        view.notifyViewCreated(1, view1);
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        assertThat(view.getNextBuffer()).isNull();
+        view0.notifyDataAvailable();
+        ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
view.getNextBuffer();
+        assertThat(bufferAndBacklog.buffer()).isEqualTo(buffers0.get(0));
+        
assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(buffers0.size() - 1);
+
+        view1.notifyDataAvailable();
+        assertThat(view.getNextBuffer().buffer()).isEqualTo(buffers0.get(1));
+
+        List<Buffer> buffers = new ArrayList<>();
+        while (view.getAvailabilityAndBacklog(true).isAvailable()) {
+            buffers.add(view.getNextBuffer().buffer());
+        }
+        assertThat(buffers)
+                .hasSize(buffers0.size() + buffers1.size() - 2)
+                .containsSubsequence(buffers0.subList(2, buffers0.size()))
+                .containsSubsequence(buffers1);
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() throws IOException {
+        view0.notifyDataAvailable();
+        view1.notifyDataAvailable();
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
+                view.getAvailabilityAndBacklog(false);
+        assertThat(availabilityAndBacklog1.getBacklog()).isPositive();
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog2.getBacklog()).isPositive();
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+
+        for (int i = 1; i < buffers0.size() + buffers1.size(); i++) {
+            view.getNextBuffer();
+        }
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog3 
=
+                view.getAvailabilityAndBacklog(false);
+        assertThat(availabilityAndBacklog3.getBacklog()).isEqualTo(0);
+        assertThat(availabilityAndBacklog3.isAvailable()).isEqualTo(true);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog4 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog4.getBacklog()).isEqualTo(0);
+        assertThat(availabilityAndBacklog4.isAvailable()).isEqualTo(true);

Review Comment:
   assertThat(availabilityAndBacklog4.getBacklog()).isZero();
   assertThat(availabilityAndBacklog4.isAvailable()).isTrue();



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.Serializable;
+
+/** A collection of subpartition indexes. */
+public interface ResultSubpartitionIndexSet extends Iterable<Integer>, 
Serializable {
+    int size();

Review Comment:
   Because this is an interface, the methods should add some annotations.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A wrapper to union the output from multiple {@link 
ResultSubpartitionView}s. This class provides
+ * the following guarantees to the output buffers.
+ *
+ * <ul>
+ *   <li>Each output buffer corresponds to a buffer in one of the 
subpartitions.
+ *   <li>Buffers in the same subpartition are output without their order 
changed.
+ *   <li>If a record is split and placed into multiple adjacent buffers due to 
the capacity limit of
+ *       the buffer, these buffers will be output consecutively without the 
entry of buffers from
+ *       other subpartitions in between.
+ * </ul>
+ */
+public class UnionResultSubpartitionView
+        implements ResultSubpartitionView, BufferAvailabilityListener {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionResultSubpartitionView.class);

Review Comment:
   `LOGGER` -> `LOG`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -52,33 +55,39 @@ public class TieredStorageConsumerClient {
                     Map<TieredStorageSubpartitionId, Tuple2<TierConsumerAgent, 
Integer>>>
             currentConsumerAgentAndSegmentIds = new HashMap<>();
 
+    private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
+
     public TieredStorageConsumerClient(
             List<TierFactory> tierFactories,
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
             TieredStorageNettyService nettyService) {
         this.tierFactories = tierFactories;
         this.nettyService = nettyService;
         this.tierConsumerAgents = 
createTierConsumerAgents(tieredStorageConsumerSpecs);
+        this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
     }
 
     public void start() {
         for (TierConsumerAgent tierConsumerAgent : tierConsumerAgents) {
             tierConsumerAgent.start();
+            for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
+                tierConsumerAgent.notifyRequiredSegmentId(

Review Comment:
   Here the request partition was sent just now, is that possible that the 
subpartition view has not been created(because the message is async processed)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java:
##########
@@ -533,7 +533,7 @@ public void close() {
     }
 
     @Override
-    public ResultSubpartitionView createSubpartitionView(
+    protected ResultSubpartitionView createSubpartitionView(

Review Comment:
   Why do these methods `createSubpartitionView` are modified to `protected`? 
That may be unnecessary because all the other implementation methods are 
`public`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -224,8 +224,9 @@ public void notifyNewBufferSize(RemoteInputChannel 
inputChannel, int bufferSize)
     }
 
     @Override
-    public void notifyRequiredSegmentId(RemoteInputChannel inputChannel, int 
segmentId) {
-        sendToChannel(new SegmentIdMessage(inputChannel, segmentId));
+    public void notifyRequiredSegmentId(
+            RemoteInputChannel inputChannel, int subpartitionIndex, int 
segmentId) {
+        sendToChannel(new SegmentIdMessage(inputChannel, subpartitionIndex, 
segmentId));

Review Comment:
   `subpartitionIndex` -> `subpartitionId`, we should also check other similar 
variables.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.Iterator;
+
+/**
+ * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The 
range is inclusive.
+ */
+public class ResultSubpartitionIndexRange extends IndexRange implements 
ResultSubpartitionIndexSet {

Review Comment:
   I notice that there are `subpartitionId`  or `subpartitionIndex` in 
different code paths. Maybe we can rename `ResultSubpartitionIndexSet` -> 
`ResultSubpartitionIdSet` and rename `ResultSubpartitionIndexRange` -> 
`ResultSubpartitionIdRange`, then we can use unified `subpartitionId` in other 
similar circumstances.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java:
##########
@@ -1736,7 +1738,8 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) {
     private RemoteInputChannel createRemoteInputChannel(
             SingleInputGate inputGate, int consumedSubpartitionIndex, int 
initialCredits) {
         return InputChannelBuilder.newBuilder()
-                .setConsumedSubpartitionIndex(consumedSubpartitionIndex)
+                .setSubpartitionIndexSet(

Review Comment:
   When modifying a test class, we also should migrate it to JUnit 5. Because 
this change has been enough complicated, I think It also can be acceptable to 
leave these migrations as the subsequential work.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -281,15 +271,9 @@ public SingleInputGate(
         this.bufferDebloater = bufferDebloater;
         this.throughputCalculator = checkNotNull(throughputCalculator);
 
-        this.tieredStorageConsumerClient = tieredStorageConsumerClient;
-        this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
-        if (enabledTieredStorage()) {
-            this.availabilityNotifier = new AvailabilityNotifierImpl();
-            setupTieredStorageNettyService(nettyService, 
tieredStorageConsumerSpecs);
-            
tieredStorageConsumerClient.registerAvailabilityNotifier(availabilityNotifier);
-        } else {
-            this.availabilityNotifier = null;
-        }
+        this.tieredStorageConsumerClient = null;

Review Comment:
   I wonder why we move the init process of these fields into 
`setTieredStorageService ` and make them non-final?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java:
##########
@@ -62,9 +62,11 @@ void requestSubpartition(
      * Notifies the id of segment required from one remote input channel.
      *
      * @param inputChannel The remote input channel who requires segment.
+     * @param subpartitionIndex The id of the corresponding subpartition.

Review Comment:
   We'd better use the same unified `subpartitionId`(e.g., in 
NetworkSequenceViewReader) instead of `subpartitionIndex`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java:
##########
@@ -100,7 +102,12 @@ void testCancelPartitionRequest() throws Exception {
             Channel ch = connect(serverAndClient);
 
             // Request for non-existing input channel => results in cancel 
request

Review Comment:
   is this `channel` -> `subpartition`? There are some other similar 
annotations in this class.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SubpartitionSelector.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * {@link SubpartitionSelector} helps to choose from multiple available 
subpartitions when their
+ * output buffers should union into one stream.
+ *
+ * @param <T> data type that could be used to represent a subpartition.
+ */
+public interface SubpartitionSelector<T> {
+    /**
+     * Marks a subpartition as having data available.
+     *
+     * @return true if this selector did not already contain the subpartition.
+     */
+    boolean notifyDataAvailable(T subpartition);
+
+    /** Returns the next subpartition to consume data. */
+    T getNextSubpartitionToConsume();
+
+    /**
+     * Records the status of the last consumption attempt on the subpartition 
returned by the last
+     * invocation of {@link #getNextSubpartitionToConsume()}.
+     *
+     * <p>This method must be invoked every time a subpartition acquired from 
this class is
+     * consumed.
+     *
+     * @param isDataAvailable whether the consumption returned a valid data.
+     * @param isPartialRecord whether the returned data contains partial 
record. Ignored if there
+     *     was no data available.
+     */
+    void markLastConsumptionStatus(boolean isDataAvailable, boolean 
isPartialRecord);
+
+    /**
+     * Whether the invoker can get a different subpartition in the next 
invocation of {@link
+     * #getNextSubpartitionToConsume()}.
+     */
+    boolean isMoreSubpartitionSwitchable();

Review Comment:
   is renaming `hasMoreSubpartitionToConsume` better here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A wrapper to union the output from multiple {@link 
ResultSubpartitionView}s. This class provides
+ * the following guarantees to the output buffers.
+ *
+ * <ul>
+ *   <li>Each output buffer corresponds to a buffer in one of the 
subpartitions.
+ *   <li>Buffers in the same subpartition are output without their order 
changed.
+ *   <li>If a record is split and placed into multiple adjacent buffers due to 
the capacity limit of
+ *       the buffer, these buffers will be output consecutively without the 
entry of buffers from
+ *       other subpartitions in between.
+ * </ul>
+ */
+public class UnionResultSubpartitionView
+        implements ResultSubpartitionView, BufferAvailabilityListener {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionResultSubpartitionView.class);
+
+    /** The maximum number of buffers to be cached in an instance of this 
class. */
+    private static final int CACHE_CAPACITY = 10;
+
+    private final Object lock = new Object();
+
+    /** All the {@link ResultSubpartitionView}s managed by this class. */
+    private final Map<Integer, ResultSubpartitionView> allViews = new 
HashMap<>();
+
+    /** All the {@link ResultSubpartitionView}s that have data available. */
+    private final SubpartitionSelector<ResultSubpartitionView> availableViews =
+            new RoundRobinSubpartitionSelector<>();
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private final Queue<ResultSubpartition.BufferAndBacklog> cachedBuffers = 
new LinkedList<>();
+
+    private boolean isReleased;
+
+    private int sequenceNumber;
+
+    public UnionResultSubpartitionView(BufferAvailabilityListener 
availabilityListener) {
+        this.availabilityListener = availabilityListener;
+        this.isReleased = false;
+        this.sequenceNumber = 0;
+    }
+
+    public void notifyViewCreated(int subpartitionIndex, 
ResultSubpartitionView view) {
+        allViews.put(subpartitionIndex, view);

Review Comment:
   `subpartitionIndex` -> `subpartitionId`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A wrapper to union the output from multiple {@link 
ResultSubpartitionView}s. This class provides
+ * the following guarantees to the output buffers.
+ *
+ * <ul>
+ *   <li>Each output buffer corresponds to a buffer in one of the 
subpartitions.
+ *   <li>Buffers in the same subpartition are output without their order 
changed.
+ *   <li>If a record is split and placed into multiple adjacent buffers due to 
the capacity limit of
+ *       the buffer, these buffers will be output consecutively without the 
entry of buffers from
+ *       other subpartitions in between.
+ * </ul>
+ */
+public class UnionResultSubpartitionView
+        implements ResultSubpartitionView, BufferAvailabilityListener {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionResultSubpartitionView.class);
+
+    /** The maximum number of buffers to be cached in an instance of this 
class. */
+    private static final int CACHE_CAPACITY = 10;
+
+    private final Object lock = new Object();
+
+    /** All the {@link ResultSubpartitionView}s managed by this class. */
+    private final Map<Integer, ResultSubpartitionView> allViews = new 
HashMap<>();
+
+    /** All the {@link ResultSubpartitionView}s that have data available. */
+    private final SubpartitionSelector<ResultSubpartitionView> availableViews =
+            new RoundRobinSubpartitionSelector<>();
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private final Queue<ResultSubpartition.BufferAndBacklog> cachedBuffers = 
new LinkedList<>();
+
+    private boolean isReleased;
+
+    private int sequenceNumber;
+
+    public UnionResultSubpartitionView(BufferAvailabilityListener 
availabilityListener) {
+        this.availabilityListener = availabilityListener;
+        this.isReleased = false;
+        this.sequenceNumber = 0;
+    }
+
+    public void notifyViewCreated(int subpartitionIndex, 
ResultSubpartitionView view) {
+        allViews.put(subpartitionIndex, view);
+    }
+
+    @Nullable
+    @Override
+    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws 
IOException {
+        synchronized (lock) {
+            cacheBuffer();
+            ResultSubpartition.BufferAndBacklog buffer = cachedBuffers.poll();
+
+            if (buffer == null) {
+                return null;
+            }
+
+            return new ResultSubpartition.BufferAndBacklog(
+                    buffer.buffer(),
+                    cachedBuffers.size(),
+                    cachedBuffers.isEmpty()
+                            ? Buffer.DataType.NONE
+                            : cachedBuffers.peek().buffer().getDataType(),
+                    sequenceNumber++);
+        }
+    }
+
+    private void cacheBuffer() throws IOException {
+        do {
+            ResultSubpartitionView currentView = 
availableViews.getNextSubpartitionToConsume();
+            if (currentView == null) {
+                break;
+            }
+
+            ResultSubpartition.BufferAndBacklog buffer = 
currentView.getNextBuffer();
+            if (buffer == null) {
+                availableViews.markLastConsumptionStatus(false, false);
+                if (!availableViews.isMoreSubpartitionSwitchable()) {
+                    break;
+                } else {
+                    continue;
+                }
+            }
+
+            boolean isLastBufferPartialRecord =

Review Comment:
   Why do we determine `isLastBufferPartialRecord` according to the buffer data 
type, we can add some annotations here to describe the logic.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnionResultSubpartitionView}. */
+public class UnionResultSubpartitionViewTest {
+
+    private UnionResultSubpartitionView view;
+
+    private List<Buffer> buffers0;
+
+    private ResultSubpartitionView view0;
+
+    private List<Buffer> buffers1;
+
+    private ResultSubpartitionView view1;
+
+    @BeforeEach
+    void before() {
+        view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+
+        buffers0 =
+                Arrays.asList(
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+        view0 = new TestingResultSubpartitionView(view, buffers0);
+        view.notifyViewCreated(0, view0);
+
+        buffers1 =
+                Arrays.asList(
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10),
+                        TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+        view1 = new TestingResultSubpartitionView(view, buffers1);
+        view.notifyViewCreated(1, view1);
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        assertThat(view.getNextBuffer()).isNull();
+        view0.notifyDataAvailable();
+        ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
view.getNextBuffer();
+        assertThat(bufferAndBacklog.buffer()).isEqualTo(buffers0.get(0));
+        
assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(buffers0.size() - 1);
+
+        view1.notifyDataAvailable();
+        assertThat(view.getNextBuffer().buffer()).isEqualTo(buffers0.get(1));
+
+        List<Buffer> buffers = new ArrayList<>();
+        while (view.getAvailabilityAndBacklog(true).isAvailable()) {
+            buffers.add(view.getNextBuffer().buffer());
+        }
+        assertThat(buffers)
+                .hasSize(buffers0.size() + buffers1.size() - 2)
+                .containsSubsequence(buffers0.subList(2, buffers0.size()))
+                .containsSubsequence(buffers1);
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() throws IOException {
+        view0.notifyDataAvailable();
+        view1.notifyDataAvailable();
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
+                view.getAvailabilityAndBacklog(false);
+        assertThat(availabilityAndBacklog1.getBacklog()).isPositive();
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog2.getBacklog()).isPositive();
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+
+        for (int i = 1; i < buffers0.size() + buffers1.size(); i++) {
+            view.getNextBuffer();
+        }
+
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog3 
=
+                view.getAvailabilityAndBacklog(false);
+        assertThat(availabilityAndBacklog3.getBacklog()).isEqualTo(0);
+        assertThat(availabilityAndBacklog3.isAvailable()).isEqualTo(true);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog4 
=
+                view.getAvailabilityAndBacklog(true);
+        assertThat(availabilityAndBacklog4.getBacklog()).isEqualTo(0);
+        assertThat(availabilityAndBacklog4.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        assertThat(view.isReleased()).isFalse();
+        assertThat(view0.isReleased()).isFalse();
+        assertThat(view1.isReleased()).isFalse();
+
+        view.releaseAllResources();
+
+        assertThat(view.isReleased()).isTrue();
+        assertThat(view0.isReleased()).isTrue();
+        assertThat(view1.isReleased()).isTrue();
+    }
+
+    private static class TestingResultSubpartitionView extends 
NoOpResultSubpartitionView {

Review Comment:
   We can define a new `TestingResultSubpartitionView` class(Ref 
`TestingHsDataView`) instead of extending from `NoOpResultSubpartitionView`, 
then we can reuse it when needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java:
##########
@@ -63,9 +63,11 @@ public void broadcastEmit(T record) throws IOException {
         // ResultPartitionWriter#broadcastRecord because the broadcastRecord

Review Comment:
   `Emitting to all channels` -> `Emitting to all subpartitions`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A wrapper to union the output from multiple {@link 
ResultSubpartitionView}s. This class provides
+ * the following guarantees to the output buffers.
+ *
+ * <ul>
+ *   <li>Each output buffer corresponds to a buffer in one of the 
subpartitions.
+ *   <li>Buffers in the same subpartition are output without their order 
changed.
+ *   <li>If a record is split and placed into multiple adjacent buffers due to 
the capacity limit of
+ *       the buffer, these buffers will be output consecutively without the 
entry of buffers from
+ *       other subpartitions in between.
+ * </ul>
+ */
+public class UnionResultSubpartitionView
+        implements ResultSubpartitionView, BufferAvailabilityListener {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionResultSubpartitionView.class);
+
+    /** The maximum number of buffers to be cached in an instance of this 
class. */

Review Comment:
   We'd better add more annotations to describe why it is necessary to cache 
some buffers in this view.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##########
@@ -304,7 +306,15 @@ enum DataType {
         RECOVERY_COMPLETION(false, true, true, false, false),
 
         /** {@link #END_OF_SEGMENT} indicates that a segment is finished in a 
subpartition. */
-        END_OF_SEGMENT(false, true, false, false, false);
+        END_OF_SEGMENT(false, true, false, false, false),
+
+        /**
+         * {@link #END_OF_DATA} indicates that there will be no more data 
buffer in a subpartition.
+         */
+        END_OF_DATA(false, true, false, false, false),

Review Comment:
   We already had `EndOfData` and `EndOfPartitionEvent` events, and why do we 
can not reuse them here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to