Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa merged PR #23927: URL: https://github.com/apache/flink/pull/23927 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: For now the implementation of `IndexRange` and this class are the same, so I used extension to avoid introducing duplicated code, and plan to remove the `extends IndexRange` once their implementations diverge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: For now the implementation of `IndexRange` and this class are the same, so I used extension to avoid introducing duplicated code, and plan to remove the `extends IndexRange` once their implementation diverges. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452075751 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: For now the implementation of `IndexRange` and this class are the same, so I used extension to avoid introducing duplicated code, and plan to remove the `extend from IndexRange` once their implementation diverges. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452042917 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java: ## @@ -65,4 +63,11 @@ void receive( * Close the accumulator. This will flush all the remaining data and release all the resources. */ void close(); + +/** Represents an operation that accepts three input arguments and returns no result. */ +@FunctionalInterface +interface TriConsumer { Review Comment: Why we need this one instead of `org.apache.flink.util.function.TriConsumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452042605 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java: ## @@ -65,4 +63,11 @@ void receive( * Close the accumulator. This will flush all the remaining data and release all the resources. */ void close(); + +/** Represents an operation that accepts three input arguments and returns no result. */ +@FunctionalInterface +interface TriConsumer { Review Comment: Why we need this one instead of `org.apache.flink.util.function.TriConsumer`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java: ## @@ -65,4 +63,11 @@ void receive( * Close the accumulator. This will flush all the remaining data and release all the resources. */ void close(); + +/** Represents an operation that accepts three input arguments and returns no result. */ +@FunctionalInterface +interface TriConsumer { Review Comment: Why we need this one instead of `org.apache.flink.util.function.TriConsumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452038225 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: In that case, extending from `IndexRange` sounds a bit unreasonable. I assume a range always means continuous values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452012771 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java: ## @@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) {} * Get the availability and backlog of the view. The availability represents if the view is * ready to get buffer from it. The backlog represents the number of available data buffers. * - * @param numCreditsAvailable the available credits for this {@link ResultSubpartitionView}. + * @param isCreditAvailable the availability of credits for this {@link ResultSubpartitionView}. * @return availability and backlog. */ -AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable); +AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable); Review Comment: This is a must-have change, because otherwise `UnionResultSubpartitionView` would need to determine how to distribute credits to child views, which is unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452009484 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: In future we may need to dynamically assign subpartitions to an input channel during runtime, in which case the index of the subpartitions may not be adjacent to each other. The IndexSet naming could offer a abstraction that is general enough for these future uses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1450161095 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException { } @Override -public void notifyDataAvailable() { +public void notifyDataAvailable(ResultSubpartitionView view) { requestQueue.notifyReaderNonEmpty(this); } @Override public void notifyPriorityEvent(int prioritySequenceNumber) { -notifyDataAvailable(); +notifyDataAvailable(this.subpartitionView); +} + +@VisibleForTesting +public void notifyDataAvailable() { +notifyDataAvailable(subpartitionView); Review Comment: `subpartitionView ` -> `this.subpartitionView ` to consistent with `notifyPriorityEvent`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java: ## @@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) {} * Get the availability and backlog of the view. The availability represents if the view is * ready to get buffer from it. The backlog represents the number of available data buffers. * - * @param numCreditsAvailable the available credits for this {@link ResultSubpartitionView}. + * @param isCreditAvailable the availability of credits for this {@link ResultSubpartitionView}. * @return availability and backlog. */ -AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable); +AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable); Review Comment: Is this commit a must-have change, or just a refactor? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: I wonder what is the difference between `IndexRange` and `IndexSet`. From my side, it's better to align with the name of the parent class. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException { } @Override -public void notifyDataAvailable() { +public void notifyDataAvailable(ResultSubpartitionView view) { requestQueue.notifyReaderNonEmpty(this); } @Override public void notifyPriorityEvent(int prioritySequenceNumber) { -notifyDataAvailable(); +notifyDataAvailable(this.subpartitionView); +} + +@VisibleForTesting Review Comment: Why this method is `@VisibleForTesting`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1450088659 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog hasBuffersAvailable() { return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE); } +@Override +public int peekNextBufferSubpartitionId() throws IOException { Review Comment: Tests have been added to CreditBasedSequenceNumberingViewReaderTest, UnionResultSubpartitionViewTest and NettyConnectionReaderTest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1450086954 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java: ## @@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { partitionId.getPartitionId(), resultPartitionBytes); } +@Override +public ResultSubpartitionView createSubpartitionView( +ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener) +throws IOException { +// The ability to support multiple indexes is to be provided in subsequent commits of +// the corresponding pull request. As the function is about to be supported uniformly with +// one set of code, they will be placed in a common method shared by all shuffle +// implementations, and that will be this method. +Iterator iterator = indexSet.values().iterator(); +int index = iterator.next(); +Preconditions.checkState(!iterator.hasNext()); +return createSubpartitionView(index, availabilityListener); +} + +/** Returns a reader for the subpartition with the given index. */ Review Comment: A more complete javadoc has been added. As the doc mentions UnionResultSubpartitionView, it is completed in the commit that introduced this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1446953814 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -58,6 +59,12 @@ class CreditBasedSequenceNumberingViewReader private final int initialCredit; +/** + * Cache of the index of the only subpartition if the underlining {@link ResultSubpartitionView} + * only consumes one subpartition. + */ +private int subpartitionId; Review Comment: We do need some explanation about the default value `-1`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java: ## @@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { partitionId.getPartitionId(), resultPartitionBytes); } +@Override +public ResultSubpartitionView createSubpartitionView( +ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener) +throws IOException { +// The ability to support multiple indexes is to be provided in subsequent commits of +// the corresponding pull request. As the function is about to be supported uniformly with +// one set of code, they will be placed in a common method shared by all shuffle +// implementations, and that will be this method. +Iterator iterator = indexSet.values().iterator(); +int index = iterator.next(); +Preconditions.checkState(!iterator.hasNext()); +return createSubpartitionView(index, availabilityListener); +} + +/** Returns a reader for the subpartition with the given index. */ Review Comment: We need a full java doc to explain the differences and connections between this method and the previous ones. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog hasBuffersAvailable() { return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE); } +@Override +public int peekNextBufferSubpartitionId() throws IOException { Review Comment: Do we have some tests to cover the method like this one introduced in this commit? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ## @@ -100,6 +104,13 @@ public Optional getNextBuffer( } Buffer bufferData = buffer.get(); if (bufferData.getDataType() == Buffer.DataType.END_OF_SEGMENT) { +EndOfSegmentEvent event = +(EndOfSegmentEvent) +EventSerializer.fromSerializedEvent( +bufferData.getNioBufferReadable(), getClass().getClassLoader()); +Preconditions.checkState( +subpartitionId.equals( +new TieredStorageSubpartitionId(event.getSubpartitionId(; Review Comment: Is this deserialization only for sanity check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on PR #23927: URL: https://github.com/apache/flink/pull/23927#issuecomment-1882465840 Hi @reswqa Could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
xintongsong commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1436062729 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.Iterator; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The range is inclusive. + */ +public class ResultSubpartitionIndexRange implements ResultSubpartitionIndexSet { Review Comment: How is this different from `org.apache.flink.runtime.executiongraph`? Can we extract a common abstract from these two? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ## @@ -52,33 +55,39 @@ public class TieredStorageConsumerClient { Map>> currentConsumerAgentAndSegmentIds = new HashMap<>(); +private final List tieredStorageConsumerSpecs; + public TieredStorageConsumerClient( List tierFactories, List tieredStorageConsumerSpecs, TieredStorageNettyService nettyService) { this.tierFactories = tierFactories; this.nettyService = nettyService; this.tierConsumerAgents = createTierConsumerAgents(tieredStorageConsumerSpecs); +this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs; } public void start() { for (TierConsumerAgent tierConsumerAgent : tierConsumerAgents) { tierConsumerAgent.start(); +for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) { +tierConsumerAgent.notifyRequiredSegmentId( +spec.getPartitionId(), spec.getSubpartitionId(), 0); +} Review Comment: Why notifying for all sub-partitions at starting? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java: ## @@ -19,18 +19,26 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import java.util.Optional; /** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read buffer from netty. */ public interface NettyConnectionReader { +/** + * Notify the upstream the id of required segment that should be sent to netty connection. + * + * @param subpartitionId The id of the corresponding subpartition. + * @param segmentId The id of required segment. + */ +void notifyRequiredSegmentId(TieredStorageSubpartitionId subpartitionId, int segmentId); Review Comment: Could you remind me why do we need to separate the notify from readBuffer? It feels like this notify interface is used for multiple purposes. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
TanYuxin-tyx commented on PR #23927: URL: https://github.com/apache/flink/pull/23927#issuecomment-1869187119 @yunfengzhou-hub Thanks for the update. I have no more comments on the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1435935387 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java: ## @@ -1736,7 +1738,8 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) { private RemoteInputChannel createRemoteInputChannel( SingleInputGate inputGate, int consumedSubpartitionIndex, int initialCredits) { return InputChannelBuilder.newBuilder() -.setConsumedSubpartitionIndex(consumedSubpartitionIndex) +.setSubpartitionIndexSet( Review Comment: Tests have been migrated to JUnit5 according to the following ticket. https://issues.apache.org/jira/browse/FLINK-32850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
TanYuxin-tyx commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1435920016 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java: ## @@ -30,24 +30,33 @@ public class NettyConnectionWriterImpl implements NettyConnectionWriter { private final NettyConnectionId connectionId; -private final BufferAvailabilityListener availabilityListener; +private Runnable availabilityListener; +public NettyConnectionWriterImpl(NettyPayloadManager nettyPayloadManager) { +this.nettyPayloadManager = nettyPayloadManager; +this.connectionId = NettyConnectionId.newId(); +} + +@VisibleForTesting Review Comment: Do we need this `VisibleForTesting`? Could we test the logic by the existing public/protected methods? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433793487 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UnionResultSubpartitionView}. */ +public class UnionResultSubpartitionViewTest { + +private UnionResultSubpartitionView view; + +private List buffers0; + +private ResultSubpartitionView view0; + +private List buffers1; + +private ResultSubpartitionView view1; + +@BeforeEach +void before() { +view = new UnionResultSubpartitionView((ResultSubpartitionView x) -> {}); + +buffers0 = +Arrays.asList( +TestBufferFactory.createBuffer(10), +TestBufferFactory.createBuffer(10), +TestBufferFactory.createBuffer(10, Buffer.DataType.EVENT_BUFFER)); +view0 = new TestingResultSubpartitionView(view, buffers0); +view.notifyViewCreated(0, view0); + +buffers1 = +Arrays.asList( +TestBufferFactory.createBuffer(10), +TestBufferFactory.createBuffer(10), +TestBufferFactory.createBuffer(10, Buffer.DataType.EVENT_BUFFER)); +view1 = new TestingResultSubpartitionView(view, buffers1); +view.notifyViewCreated(1, view1); +} + +@Test +void testGetNextBuffer() throws IOException { +assertThat(view.getNextBuffer()).isNull(); +view0.notifyDataAvailable(); +ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer(); +assertThat(bufferAndBacklog.buffer()).isEqualTo(buffers0.get(0)); + assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(buffers0.size() - 1); + +view1.notifyDataAvailable(); +assertThat(view.getNextBuffer().buffer()).isEqualTo(buffers0.get(1)); + +List buffers = new ArrayList<>(); +while (view.getAvailabilityAndBacklog(true).isAvailable()) { +buffers.add(view.getNextBuffer().buffer()); +} +assertThat(buffers) +.hasSize(buffers0.size() + buffers1.size() - 2) +.containsSubsequence(buffers0.subList(2, buffers0.size())) +.containsSubsequence(buffers1); +} + +@Test +void testGetAvailabilityAndBacklog() throws IOException { +view0.notifyDataAvailable(); +view1.notifyDataAvailable(); + +ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 = +view.getAvailabilityAndBacklog(false); +assertThat(availabilityAndBacklog1.getBacklog()).isPositive(); +assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false); +ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = +view.getAvailabilityAndBacklog(true); +assertThat(availabilityAndBacklog2.getBacklog()).isPositive(); +assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true); + +for (int i = 1; i < buffers0.size() + buffers1.size(); i++) { +view.getNextBuffer(); +} + +ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog3 = +view.getAvailabilityAndBacklog(false); +assertThat(availabilityAndBacklog3.getBacklog()).isEqualTo(0); +assertThat(availabilityAndBacklog3.isAvailable()).isEqualTo(true); +ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog4 = +view.getAvailabilityAndBacklog(true); +
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433794402 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java: ## @@ -304,7 +306,15 @@ enum DataType { RECOVERY_COMPLETION(false, true, true, false, false), /** {@link #END_OF_SEGMENT} indicates that a segment is finished in a subpartition. */ -END_OF_SEGMENT(false, true, false, false, false); +END_OF_SEGMENT(false, true, false, false, false), + +/** + * {@link #END_OF_DATA} indicates that there will be no more data buffer in a subpartition. + */ +END_OF_DATA(false, true, false, false, false), Review Comment: EndOfData and EndOfPartitionEvent are RuntimeEvent subclasses, while here we need Buffer.DataType enum values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433753103 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SubpartitionSelector.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +/** + * {@link SubpartitionSelector} helps to choose from multiple available subpartitions when their + * output buffers should union into one stream. + * + * @param data type that could be used to represent a subpartition. + */ +public interface SubpartitionSelector { +/** + * Marks a subpartition as having data available. + * + * @return true if this selector did not already contain the subpartition. + */ +boolean notifyDataAvailable(T subpartition); + +/** Returns the next subpartition to consume data. */ +T getNextSubpartitionToConsume(); + +/** + * Records the status of the last consumption attempt on the subpartition returned by the last + * invocation of {@link #getNextSubpartitionToConsume()}. + * + * This method must be invoked every time a subpartition acquired from this class is + * consumed. + * + * @param isDataAvailable whether the consumption returned a valid data. + * @param isPartialRecord whether the returned data contains partial record. Ignored if there + * was no data available. + */ +void markLastConsumptionStatus(boolean isDataAvailable, boolean isPartialRecord); + +/** + * Whether the invoker can get a different subpartition in the next invocation of {@link + * #getNextSubpartitionToConsume()}. + */ +boolean isMoreSubpartitionSwitchable(); Review Comment: isMoreSubpartitionSwitchable = hasMoreSubpartitionToConsume && last buffer does not contain partial record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433752308 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java: ## @@ -281,15 +271,9 @@ public SingleInputGate( this.bufferDebloater = bufferDebloater; this.throughputCalculator = checkNotNull(throughputCalculator); -this.tieredStorageConsumerClient = tieredStorageConsumerClient; -this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs; -if (enabledTieredStorage()) { -this.availabilityNotifier = new AvailabilityNotifierImpl(); -setupTieredStorageNettyService(nettyService, tieredStorageConsumerSpecs); - tieredStorageConsumerClient.registerAvailabilityNotifier(availabilityNotifier); -} else { -this.availabilityNotifier = null; -} +this.tieredStorageConsumerClient = null; Review Comment: In this pull request we added `TieredStorageInputChannelId` to `TieredStorageConsumerSpec`, which theoretically can only be acquired after the input channel is created (despite that `TieredStorageInputChannelId` only relies on inputchannels' indexes for now, which can be determined before the creation of the input channels). Given that input channel is initialized after SingleInputGate is created, we should also move the init process of tiered storage services to after SingleInputGate's construction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433747654 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ## @@ -52,33 +55,39 @@ public class TieredStorageConsumerClient { Map>> currentConsumerAgentAndSegmentIds = new HashMap<>(); +private final List tieredStorageConsumerSpecs; + public TieredStorageConsumerClient( List tierFactories, List tieredStorageConsumerSpecs, TieredStorageNettyService nettyService) { this.tierFactories = tierFactories; this.nettyService = nettyService; this.tierConsumerAgents = createTierConsumerAgents(tieredStorageConsumerSpecs); +this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs; } public void start() { for (TierConsumerAgent tierConsumerAgent : tierConsumerAgents) { tierConsumerAgent.start(); +for (TieredStorageConsumerSpec spec : tieredStorageConsumerSpecs) { +tierConsumerAgent.notifyRequiredSegmentId( Review Comment: In `SingleInputGate#requestPartitions`, a subpartition view is created in `internalRequestPartitions()`, which will be invoked before `TieredStorageConsumerClient#start`. Thus it can be guaranteed that the subpartition view has been created before the first `notifyRequiredSegmentId` is invoked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433743541 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java: ## @@ -62,9 +62,11 @@ void requestSubpartition( * Notifies the id of segment required from one remote input channel. * * @param inputChannel The remote input channel who requires segment. + * @param subpartitionIndex The id of the corresponding subpartition. Review Comment: According to offline discussions, we would use id in the following situations - Tiered Hybrid Shuffle - Parameters with type `TieredStorageSubpartitionId` - Used as keys in maps And use index or idx in the following situations - Other shuffle types and the common modules shared among shuffles - Parameters with type `int` - Used as indexes in arrays -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433740732 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java: ## @@ -1736,7 +1738,8 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) { private RemoteInputChannel createRemoteInputChannel( SingleInputGate inputGate, int consumedSubpartitionIndex, int initialCredits) { return InputChannelBuilder.newBuilder() -.setConsumedSubpartitionIndex(consumedSubpartitionIndex) +.setSubpartitionIndexSet( Review Comment: I agree with it that it might be better not add more work to the already large pull request. We may create another PR for this migration later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433735764 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java: ## @@ -100,7 +102,12 @@ void testCancelPartitionRequest() throws Exception { Channel ch = connect(serverAndClient); // Request for non-existing input channel => results in cancel request Review Comment: This test class mainly verifies cases when the input channel does not exist, as shown in `CreditBasedPartitionRequestClientHandler` ```java RemoteInputChannel inputChannel = inputChannels.get(announcement.receiverId); if (inputChannel == null || inputChannel.isReleased()) { cancelRequestFor(announcement.receiverId); return; } ``` Thus the comment here has been correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433736474 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.Iterator; + +/** + * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The range is inclusive. + */ +public class ResultSubpartitionIndexRange extends IndexRange implements ResultSubpartitionIndexSet { Review Comment: The extension will be removed according to offline discussions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433659493 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java: ## @@ -533,7 +533,7 @@ public void close() { } @Override -public ResultSubpartitionView createSubpartitionView( +protected ResultSubpartitionView createSubpartitionView( Review Comment: This is because the method is only used in internal implementations of ResultPartition. The public interface has been modified to expose IndexSet parameter instead of int. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1433656381 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.Serializable; + +/** A collection of subpartition indexes. */ +public interface ResultSubpartitionIndexSet extends Iterable, Serializable { Review Comment: I agree with it that we should reduce unnecessary inheritance. I'll defined a method here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
TanYuxin-tyx commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1432220667 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java: ## @@ -304,7 +306,15 @@ enum DataType { RECOVERY_COMPLETION(false, true, true, false, false), /** {@link #END_OF_SEGMENT} indicates that a segment is finished in a subpartition. */ -END_OF_SEGMENT(false, true, false, false, false); +END_OF_SEGMENT(false, true, false, false, false), + +/** + * {@link #END_OF_DATA} indicates that there will be no more data buffer in a subpartition. + */ +END_OF_DATA(false, true, false, false, false), Review Comment: We already had `EndOfData` and `EndOfPartitionEvent` events, and why not reuse them here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
TanYuxin-tyx commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1431327842 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.Serializable; + +/** A collection of subpartition indexes. */ +public interface ResultSubpartitionIndexSet extends Iterable, Serializable { Review Comment: Do we need the interface extended from Iterable, Is it more flexible to define a method in the interface instead? Note this is not a strong comment because this can also work, but I think it's worth discussing it carefully for better subsequential implementation. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexRange.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.Iterator; + +/** + * A {@link ResultSubpartitionIndexSet} represented as a range of indexes. The range is inclusive. + */ +public class ResultSubpartitionIndexRange extends IndexRange implements ResultSubpartitionIndexSet { Review Comment: Is this extended because of compatibility or other considerations? ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UnionResultSubpartitionView}. */ +public class UnionResultSubpartitionViewTest { Review Comment: Remove the `public` here. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on PR #23927: URL: https://github.com/apache/flink/pull/23927#issuecomment-1862278523 Hi @TanYuxin-tyx could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
flinkbot commented on PR #23927: URL: https://github.com/apache/flink/pull/23927#issuecomment-1855302232 ## CI report: * 85ba4659f937d58dd510aa63009c3939bc085300 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub opened a new pull request, #23927: URL: https://github.com/apache/flink/pull/23927 ## What is the purpose of the change This pull request adds support for consuming multiple subpartitions in a single input channel. ## Brief change log - Separate the notion of subpartition and channel in terms of naming, signature and comments. - Union the output from multiple subpartitions with one ResultSubpartitionView and TierConsumerAgent - Control the partial record split logic when writing buffers into subpartition to avoid potential deadlocks ## Verifying this change This change added tests and can be verified as follows: - Extended HybridShuffleITCase to cover cases when the feature proposed in this PR comes into effect. - Added unit tests for newly introduced options and classes - Manually verified situations that are not covered by e2e tests, like when tiered hybrid shuffle uses sort buffer. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? no need to document -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org