Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-16 Thread via GitHub


reswqa merged PR #23927:
URL: https://github.com/apache/flink/pull/23927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   For now the implementation of `IndexRange` and this class are the same, so I 
used extension to avoid introducing duplicated code, and plan to remove the 
`extends IndexRange` once their implementations diverge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   For now the implementation of `IndexRange` and this class are the same, so I 
used extension to avoid introducing duplicated code, and plan to remove the 
`extends IndexRange` once their implementation diverges.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   For now the implementation of `IndexRange` and this class are the same, so I 
used extension to avoid introducing duplicated code, and plan to remove the 
`extend from IndexRange` once their implementation diverges.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452042917


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java:
##
@@ -65,4 +63,11 @@ void receive(
  * Close the accumulator. This will flush all the remaining data and 
release all the resources.
  */
 void close();
+
+/** Represents an operation that accepts three input arguments and returns 
no result. */
+@FunctionalInterface
+interface TriConsumer {

Review Comment:
   Why we need this one instead of `org.apache.flink.util.function.TriConsumer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452042605


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java:
##
@@ -65,4 +63,11 @@ void receive(
  * Close the accumulator. This will flush all the remaining data and 
release all the resources.
  */
 void close();
+
+/** Represents an operation that accepts three input arguments and returns 
no result. */
+@FunctionalInterface
+interface TriConsumer {

Review Comment:
   Why we need this one instead of `org.apache.flink.util.function.TriConsumer`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java:
##
@@ -65,4 +63,11 @@ void receive(
  * Close the accumulator. This will flush all the remaining data and 
release all the resources.
  */
 void close();
+
+/** Represents an operation that accepts three input arguments and returns 
no result. */
+@FunctionalInterface
+interface TriConsumer {

Review Comment:
   Why we need this one instead of `org.apache.flink.util.function.TriConsumer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-15 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452038225


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   In that case, extending from `IndexRange` sounds a bit unreasonable. I 
assume a range always means continuous values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452012771


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##
@@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) 
{}
  * Get the availability and backlog of the view. The availability 
represents if the view is
  * ready to get buffer from it. The backlog represents the number of 
available data buffers.
  *
- * @param numCreditsAvailable the available credits for this {@link 
ResultSubpartitionView}.
+ * @param isCreditAvailable the availability of credits for this {@link 
ResultSubpartitionView}.
  * @return availability and backlog.
  */
-AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
+AvailabilityWithBacklog getAvailabilityAndBacklog(boolean 
isCreditAvailable);

Review Comment:
   This is a must-have change, because otherwise `UnionResultSubpartitionView` 
would need to determine how to distribute credits to child views, which is 
unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452009484


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   In future we may need to dynamically assign subpartitions to an input 
channel during runtime, in which case the index of the subpartitions may not be 
adjacent to each other. The IndexSet naming could offer a abstraction that is 
general enough for these future uses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1450161095


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
 }
 
 @Override
-public void notifyDataAvailable() {
+public void notifyDataAvailable(ResultSubpartitionView view) {
 requestQueue.notifyReaderNonEmpty(this);
 }
 
 @Override
 public void notifyPriorityEvent(int prioritySequenceNumber) {
-notifyDataAvailable();
+notifyDataAvailable(this.subpartitionView);
+}
+
+@VisibleForTesting
+public void notifyDataAvailable() {
+notifyDataAvailable(subpartitionView);

Review Comment:
   `subpartitionView ` -> `this.subpartitionView ` to consistent with 
`notifyPriorityEvent`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##
@@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) 
{}
  * Get the availability and backlog of the view. The availability 
represents if the view is
  * ready to get buffer from it. The backlog represents the number of 
available data buffers.
  *
- * @param numCreditsAvailable the available credits for this {@link 
ResultSubpartitionView}.
+ * @param isCreditAvailable the availability of credits for this {@link 
ResultSubpartitionView}.
  * @return availability and backlog.
  */
-AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
+AvailabilityWithBacklog getAvailabilityAndBacklog(boolean 
isCreditAvailable);

Review Comment:
   Is this commit a must-have change, or just a refactor? 



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   I wonder what is the difference between `IndexRange` and `IndexSet`. From my 
side, it's better to align with the name of the parent class.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
 }
 
 @Override
-public void notifyDataAvailable() {
+public void notifyDataAvailable(ResultSubpartitionView view) {
 requestQueue.notifyReaderNonEmpty(this);
 }
 
 @Override
 public void notifyPriorityEvent(int prioritySequenceNumber) {
-notifyDataAvailable();
+notifyDataAvailable(this.subpartitionView);
+}
+
+@VisibleForTesting

Review Comment:
   Why this method is `@VisibleForTesting`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-12 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1450088659


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog 
hasBuffersAvailable() {
 return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE);
 }
 
+@Override
+public int peekNextBufferSubpartitionId() throws IOException {

Review Comment:
   Tests have been added to CreditBasedSequenceNumberingViewReaderTest, 
UnionResultSubpartitionViewTest and NettyConnectionReaderTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-12 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1450086954


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java:
##
@@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 partitionId.getPartitionId(), resultPartitionBytes);
 }
 
+@Override
+public ResultSubpartitionView createSubpartitionView(
+ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener 
availabilityListener)
+throws IOException {
+// The ability to support multiple indexes is to be provided in 
subsequent commits of
+// the corresponding pull request. As the function is about to be 
supported uniformly with
+// one set of code, they will be placed in a common method shared by 
all shuffle
+// implementations, and that will be this method.
+Iterator iterator = indexSet.values().iterator();
+int index = iterator.next();
+Preconditions.checkState(!iterator.hasNext());
+return createSubpartitionView(index, availabilityListener);
+}
+
+/** Returns a reader for the subpartition with the given index. */

Review Comment:
   A more complete javadoc has been added. As the doc mentions 
UnionResultSubpartitionView, it is completed in the commit that introduced this 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-09 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1446953814


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -58,6 +59,12 @@ class CreditBasedSequenceNumberingViewReader
 
 private final int initialCredit;
 
+/**
+ * Cache of the index of the only subpartition if the underlining {@link 
ResultSubpartitionView}
+ * only consumes one subpartition.
+ */
+private int subpartitionId;

Review Comment:
   We do need some explanation about the default value `-1`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java:
##
@@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 partitionId.getPartitionId(), resultPartitionBytes);
 }
 
+@Override
+public ResultSubpartitionView createSubpartitionView(
+ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener 
availabilityListener)
+throws IOException {
+// The ability to support multiple indexes is to be provided in 
subsequent commits of
+// the corresponding pull request. As the function is about to be 
supported uniformly with
+// one set of code, they will be placed in a common method shared by 
all shuffle
+// implementations, and that will be this method.
+Iterator iterator = indexSet.values().iterator();
+int index = iterator.next();
+Preconditions.checkState(!iterator.hasNext());
+return createSubpartitionView(index, availabilityListener);
+}
+
+/** Returns a reader for the subpartition with the given index. */

Review Comment:
   We need a full java doc to explain the differences and connections between 
this method and the previous ones.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog 
hasBuffersAvailable() {
 return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE);
 }
 
+@Override
+public int peekNextBufferSubpartitionId() throws IOException {

Review Comment:
   Do we have some tests to cover the method like this one introduced in this 
commit? 



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -100,6 +104,13 @@ public Optional getNextBuffer(
 }
 Buffer bufferData = buffer.get();
 if (bufferData.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
+EndOfSegmentEvent event =
+(EndOfSegmentEvent)
+EventSerializer.fromSerializedEvent(
+bufferData.getNioBufferReadable(), 
getClass().getClassLoader());
+Preconditions.checkState(
+subpartitionId.equals(
+new 
TieredStorageSubpartitionId(event.getSubpartitionId(;

Review Comment:
   Is this deserialization only for sanity check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-08 Thread via GitHub


yunfengzhou-hub commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1882465840

   Hi @reswqa Could you please take a look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-25 Thread via GitHub


xintongsong commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1436062729


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The 
range is inclusive.
+ */
+public class ResultSubpartitionIndexRange implements 
ResultSubpartitionIndexSet {

Review Comment:
   How is this different from `org.apache.flink.runtime.executiongraph`? Can we 
extract a common abstract from these two?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -52,33 +55,39 @@ public class TieredStorageConsumerClient {
 Map>>
 currentConsumerAgentAndSegmentIds = new HashMap<>();
 
+private final List tieredStorageConsumerSpecs;
+
 public TieredStorageConsumerClient(
 List tierFactories,
 List tieredStorageConsumerSpecs,
 TieredStorageNettyService nettyService) {
 this.tierFactories = tierFactories;
 this.nettyService = nettyService;
 this.tierConsumerAgents = 
createTierConsumerAgents(tieredStorageConsumerSpecs);
+this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
 }
 
 public void start() {
 for (TierConsumerAgent tierConsumerAgent : tierConsumerAgents) {
 tierConsumerAgent.start();
+for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
+tierConsumerAgent.notifyRequiredSegmentId(
+spec.getPartitionId(), spec.getSubpartitionId(), 0);
+}

Review Comment:
   Why notifying for all sub-partitions at starting?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java:
##
@@ -19,18 +19,26 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 
 import java.util.Optional;
 
 /** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read 
buffer from netty. */
 public interface NettyConnectionReader {
+/**
+ * Notify the upstream the id of required segment that should be sent to 
netty connection.
+ *
+ * @param subpartitionId The id of the corresponding subpartition.
+ * @param segmentId The id of required segment.
+ */
+void notifyRequiredSegmentId(TieredStorageSubpartitionId subpartitionId, 
int segmentId);

Review Comment:
   Could you remind me why do we need to separate the notify from readBuffer? 
It feels like this notify interface is used for multiple purposes.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing 

Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-25 Thread via GitHub


TanYuxin-tyx commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1869187119

   @yunfengzhou-hub Thanks for the update. I have no more comments on the 
change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-24 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1435935387


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java:
##
@@ -1736,7 +1738,8 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) {
 private RemoteInputChannel createRemoteInputChannel(
 SingleInputGate inputGate, int consumedSubpartitionIndex, int 
initialCredits) {
 return InputChannelBuilder.newBuilder()
-.setConsumedSubpartitionIndex(consumedSubpartitionIndex)
+.setSubpartitionIndexSet(

Review Comment:
   Tests have been migrated to JUnit5 according to the following ticket.
   https://issues.apache.org/jira/browse/FLINK-32850



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-24 Thread via GitHub


TanYuxin-tyx commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1435920016


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##
@@ -30,24 +30,33 @@ public class NettyConnectionWriterImpl implements 
NettyConnectionWriter {
 
 private final NettyConnectionId connectionId;
 
-private final BufferAvailabilityListener availabilityListener;
+private Runnable availabilityListener;
 
+public NettyConnectionWriterImpl(NettyPayloadManager nettyPayloadManager) {
+this.nettyPayloadManager = nettyPayloadManager;
+this.connectionId = NettyConnectionId.newId();
+}
+
+@VisibleForTesting

Review Comment:
   Do we need this `VisibleForTesting`? Could we test the logic by the existing 
public/protected methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433793487


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnionResultSubpartitionView}. */
+public class UnionResultSubpartitionViewTest {
+
+private UnionResultSubpartitionView view;
+
+private List buffers0;
+
+private ResultSubpartitionView view0;
+
+private List buffers1;
+
+private ResultSubpartitionView view1;
+
+@BeforeEach
+void before() {
+view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> 
{});
+
+buffers0 =
+Arrays.asList(
+TestBufferFactory.createBuffer(10),
+TestBufferFactory.createBuffer(10),
+TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+view0 = new TestingResultSubpartitionView(view, buffers0);
+view.notifyViewCreated(0, view0);
+
+buffers1 =
+Arrays.asList(
+TestBufferFactory.createBuffer(10),
+TestBufferFactory.createBuffer(10),
+TestBufferFactory.createBuffer(10, 
Buffer.DataType.EVENT_BUFFER));
+view1 = new TestingResultSubpartitionView(view, buffers1);
+view.notifyViewCreated(1, view1);
+}
+
+@Test
+void testGetNextBuffer() throws IOException {
+assertThat(view.getNextBuffer()).isNull();
+view0.notifyDataAvailable();
+ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
view.getNextBuffer();
+assertThat(bufferAndBacklog.buffer()).isEqualTo(buffers0.get(0));
+
assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(buffers0.size() - 1);
+
+view1.notifyDataAvailable();
+assertThat(view.getNextBuffer().buffer()).isEqualTo(buffers0.get(1));
+
+List buffers = new ArrayList<>();
+while (view.getAvailabilityAndBacklog(true).isAvailable()) {
+buffers.add(view.getNextBuffer().buffer());
+}
+assertThat(buffers)
+.hasSize(buffers0.size() + buffers1.size() - 2)
+.containsSubsequence(buffers0.subList(2, buffers0.size()))
+.containsSubsequence(buffers1);
+}
+
+@Test
+void testGetAvailabilityAndBacklog() throws IOException {
+view0.notifyDataAvailable();
+view1.notifyDataAvailable();
+
+ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
+view.getAvailabilityAndBacklog(false);
+assertThat(availabilityAndBacklog1.getBacklog()).isPositive();
+assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
+view.getAvailabilityAndBacklog(true);
+assertThat(availabilityAndBacklog2.getBacklog()).isPositive();
+assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+
+for (int i = 1; i < buffers0.size() + buffers1.size(); i++) {
+view.getNextBuffer();
+}
+
+ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog3 
=
+view.getAvailabilityAndBacklog(false);
+assertThat(availabilityAndBacklog3.getBacklog()).isEqualTo(0);
+assertThat(availabilityAndBacklog3.isAvailable()).isEqualTo(true);
+ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog4 
=
+view.getAvailabilityAndBacklog(true);
+

Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433794402


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##
@@ -304,7 +306,15 @@ enum DataType {
 RECOVERY_COMPLETION(false, true, true, false, false),
 
 /** {@link #END_OF_SEGMENT} indicates that a segment is finished in a 
subpartition. */
-END_OF_SEGMENT(false, true, false, false, false);
+END_OF_SEGMENT(false, true, false, false, false),
+
+/**
+ * {@link #END_OF_DATA} indicates that there will be no more data 
buffer in a subpartition.
+ */
+END_OF_DATA(false, true, false, false, false),

Review Comment:
   EndOfData and EndOfPartitionEvent are RuntimeEvent subclasses, while here we 
need Buffer.DataType enum values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433753103


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SubpartitionSelector.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * {@link SubpartitionSelector} helps to choose from multiple available 
subpartitions when their
+ * output buffers should union into one stream.
+ *
+ * @param  data type that could be used to represent a subpartition.
+ */
+public interface SubpartitionSelector {
+/**
+ * Marks a subpartition as having data available.
+ *
+ * @return true if this selector did not already contain the subpartition.
+ */
+boolean notifyDataAvailable(T subpartition);
+
+/** Returns the next subpartition to consume data. */
+T getNextSubpartitionToConsume();
+
+/**
+ * Records the status of the last consumption attempt on the subpartition 
returned by the last
+ * invocation of {@link #getNextSubpartitionToConsume()}.
+ *
+ * This method must be invoked every time a subpartition acquired from 
this class is
+ * consumed.
+ *
+ * @param isDataAvailable whether the consumption returned a valid data.
+ * @param isPartialRecord whether the returned data contains partial 
record. Ignored if there
+ * was no data available.
+ */
+void markLastConsumptionStatus(boolean isDataAvailable, boolean 
isPartialRecord);
+
+/**
+ * Whether the invoker can get a different subpartition in the next 
invocation of {@link
+ * #getNextSubpartitionToConsume()}.
+ */
+boolean isMoreSubpartitionSwitchable();

Review Comment:
   isMoreSubpartitionSwitchable = hasMoreSubpartitionToConsume && last buffer 
does not contain partial record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433752308


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##
@@ -281,15 +271,9 @@ public SingleInputGate(
 this.bufferDebloater = bufferDebloater;
 this.throughputCalculator = checkNotNull(throughputCalculator);
 
-this.tieredStorageConsumerClient = tieredStorageConsumerClient;
-this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
-if (enabledTieredStorage()) {
-this.availabilityNotifier = new AvailabilityNotifierImpl();
-setupTieredStorageNettyService(nettyService, 
tieredStorageConsumerSpecs);
-
tieredStorageConsumerClient.registerAvailabilityNotifier(availabilityNotifier);
-} else {
-this.availabilityNotifier = null;
-}
+this.tieredStorageConsumerClient = null;

Review Comment:
   In this pull request we added `TieredStorageInputChannelId` to 
`TieredStorageConsumerSpec`, which theoretically can only be acquired after the 
input channel is created (despite that `TieredStorageInputChannelId` only 
relies on inputchannels' indexes for now, which can be determined before the 
creation of the input channels). Given that input channel is initialized after 
SingleInputGate is created, we should also move the init process of tiered 
storage services to after SingleInputGate's construction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433747654


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -52,33 +55,39 @@ public class TieredStorageConsumerClient {
 Map>>
 currentConsumerAgentAndSegmentIds = new HashMap<>();
 
+private final List tieredStorageConsumerSpecs;
+
 public TieredStorageConsumerClient(
 List tierFactories,
 List tieredStorageConsumerSpecs,
 TieredStorageNettyService nettyService) {
 this.tierFactories = tierFactories;
 this.nettyService = nettyService;
 this.tierConsumerAgents = 
createTierConsumerAgents(tieredStorageConsumerSpecs);
+this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
 }
 
 public void start() {
 for (TierConsumerAgent tierConsumerAgent : tierConsumerAgents) {
 tierConsumerAgent.start();
+for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) {
+tierConsumerAgent.notifyRequiredSegmentId(

Review Comment:
   In `SingleInputGate#requestPartitions`, a subpartition view is created in 
`internalRequestPartitions()`, which will be invoked before 
`TieredStorageConsumerClient#start`. Thus it can be guaranteed that the 
subpartition view has been created before the first `notifyRequiredSegmentId` 
is invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433743541


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java:
##
@@ -62,9 +62,11 @@ void requestSubpartition(
  * Notifies the id of segment required from one remote input channel.
  *
  * @param inputChannel The remote input channel who requires segment.
+ * @param subpartitionIndex The id of the corresponding subpartition.

Review Comment:
   According to offline discussions, we would use id in the following situations
   - Tiered Hybrid Shuffle
   - Parameters with type `TieredStorageSubpartitionId`
   - Used as keys in maps
   
   And use index or idx in the following situations
   - Other shuffle types and the common modules shared among shuffles
   - Parameters with type `int`
   - Used as indexes in arrays



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433740732


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java:
##
@@ -1736,7 +1738,8 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) {
 private RemoteInputChannel createRemoteInputChannel(
 SingleInputGate inputGate, int consumedSubpartitionIndex, int 
initialCredits) {
 return InputChannelBuilder.newBuilder()
-.setConsumedSubpartitionIndex(consumedSubpartitionIndex)
+.setSubpartitionIndexSet(

Review Comment:
   I agree with it that it might be better not add more work to the already 
large pull request. We may create another PR for this migration later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433735764


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java:
##
@@ -100,7 +102,12 @@ void testCancelPartitionRequest() throws Exception {
 Channel ch = connect(serverAndClient);
 
 // Request for non-existing input channel => results in cancel 
request

Review Comment:
   This test class mainly verifies cases when the input channel does not exist, 
as shown in `CreditBasedPartitionRequestClientHandler`
   ```java
   RemoteInputChannel inputChannel = inputChannels.get(announcement.receiverId);
   if (inputChannel == null || inputChannel.isReleased()) {
   cancelRequestFor(announcement.receiverId);
   return;
   }
   ```
   Thus the comment here has been correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433736474


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.Iterator;
+
+/**
+ * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The 
range is inclusive.
+ */
+public class ResultSubpartitionIndexRange extends IndexRange implements 
ResultSubpartitionIndexSet {

Review Comment:
   The extension will be removed according to offline discussions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-20 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433659493


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java:
##
@@ -533,7 +533,7 @@ public void close() {
 }
 
 @Override
-public ResultSubpartitionView createSubpartitionView(
+protected ResultSubpartitionView createSubpartitionView(

Review Comment:
   This is because the method is only used in internal implementations of 
ResultPartition. The public interface has been modified to expose IndexSet 
parameter instead of int.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-20 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1433656381


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.Serializable;
+
+/** A collection of subpartition indexes. */
+public interface ResultSubpartitionIndexSet extends Iterable, 
Serializable {

Review Comment:
   I agree with it that we should reduce unnecessary inheritance. I'll defined 
a method here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-20 Thread via GitHub


TanYuxin-tyx commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1432220667


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##
@@ -304,7 +306,15 @@ enum DataType {
 RECOVERY_COMPLETION(false, true, true, false, false),
 
 /** {@link #END_OF_SEGMENT} indicates that a segment is finished in a 
subpartition. */
-END_OF_SEGMENT(false, true, false, false, false);
+END_OF_SEGMENT(false, true, false, false, false),
+
+/**
+ * {@link #END_OF_DATA} indicates that there will be no more data 
buffer in a subpartition.
+ */
+END_OF_DATA(false, true, false, false, false),

Review Comment:
   We already had `EndOfData` and `EndOfPartitionEvent` events, and why not 
reuse them here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-19 Thread via GitHub


TanYuxin-tyx commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1431327842


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.Serializable;
+
+/** A collection of subpartition indexes. */
+public interface ResultSubpartitionIndexSet extends Iterable, 
Serializable {

Review Comment:
   Do we need the interface extended from Iterable, Is it more flexible to 
define a method in the interface instead? 
   Note this is not a strong comment because this can also work, but I think 
it's worth discussing it carefully for better subsequential implementation.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.Iterator;
+
+/**
+ * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The 
range is inclusive.
+ */
+public class ResultSubpartitionIndexRange extends IndexRange implements 
ResultSubpartitionIndexSet {

Review Comment:
   Is this extended because of compatibility or other considerations?



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnionResultSubpartitionView}. */
+public class UnionResultSubpartitionViewTest {

Review Comment:
   Remove the `public` here.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding 

Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-18 Thread via GitHub


yunfengzhou-hub commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1862278523

   Hi @TanYuxin-tyx could you please take a look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1855302232

   
   ## CI report:
   
   * 85ba4659f937d58dd510aa63009c3939bc085300 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2023-12-13 Thread via GitHub


yunfengzhou-hub opened a new pull request, #23927:
URL: https://github.com/apache/flink/pull/23927

   ## What is the purpose of the change
   
   This pull request adds support for consuming multiple subpartitions in a 
single input channel.
   
   
   ## Brief change log
   
   - Separate the notion of subpartition and channel in terms of naming, 
signature and comments.
   - Union the output from multiple subpartitions with one 
ResultSubpartitionView and TierConsumerAgent
   - Control the partial record split logic when writing buffers into 
subpartition to avoid potential deadlocks
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Extended HybridShuffleITCase to cover cases when the feature proposed in 
this PR comes into effect.
   - Added unit tests for newly introduced options and classes
   - Manually verified situations that are not covered by e2e tests, like when 
tiered hybrid shuffle uses sort buffer.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? no need to document
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org