This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ba4b1829558 [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable. ba4b1829558 is described below commit ba4b182955867fedfa9891bf0bf430e92eeab41a Author: Weijie Guo <res...@163.com> AuthorDate: Thu Nov 10 17:03:40 2022 +0800 [FLINK-29975][runtime] Mark HYBRID_FULL result partition as re-consumable. HYBRID_FULL result partition become re-consumable since FLINK-28889. This closes #21284 --- .../failover/flip1/RestartPipelinedRegionFailoverStrategy.java | 10 ++++------ .../runtime/io/network/partition/ResultPartitionType.java | 10 ++++------ .../streaming/api/transformations/StreamExchangeMode.java | 4 ++-- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java index fba91f495e1..08ab5b2ba66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java @@ -293,8 +293,8 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy && resultPartitionAvailabilityChecker.isAvailable(resultPartitionID) // If the result partition is available in the partition tracker and does not // fail, it will be available if it can be re-consumption, and it may also be - // available for PIPELINED_APPROXIMATE and HYBRID_FULL type. - && isResultPartitionCanBeConsumedRepeatedly(resultPartitionID); + // available for PIPELINED_APPROXIMATE type. + && isResultPartitionIsReConsumableOrPipelinedApproximate(resultPartitionID); } public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) { @@ -306,14 +306,12 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy failedPartitions.remove(resultPartitionID); } - private boolean isResultPartitionCanBeConsumedRepeatedly( + private boolean isResultPartitionIsReConsumableOrPipelinedApproximate( IntermediateResultPartitionID resultPartitionID) { ResultPartitionType resultPartitionType = resultPartitionTypeRetriever.apply(resultPartitionID); return resultPartitionType.isReconsumable() - || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE - // TODO support re-consumable for HYBRID_FULL resultPartitionType. - || resultPartitionType == ResultPartitionType.HYBRID_FULL; + || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java index 1cbdcf5bc19..29489f654dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java @@ -91,16 +91,14 @@ public enum ResultPartitionType { * * <p>Hybrid partitions can be consumed any time, whether fully produced or not. * - * <p>HYBRID_FULL partitions can be consumed repeatedly, but it does not support concurrent - * consumption. So re-consumable is false, but double calculation can be avoided during + * <p>HYBRID_FULL partitions is re-consumable, so double calculation can be avoided during * failover. */ - // TODO support re-consumable for HYBRID_FULL resultPartitionType. - HYBRID_FULL(false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER), + HYBRID_FULL(true, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER), /** - * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it cannot be - * consumed repeatedly. + * HYBRID_SELECTIVE partitions are similar to {@link #HYBRID_FULL} partitions, but it is not + * re-consumable. */ HYBRID_SELECTIVE( false, false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java index c78683031fb..f7270c05b44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java @@ -39,14 +39,14 @@ public enum StreamExchangeMode { /** * The consumer can start consuming data anytime as long as the producer has started producing. * - * <p>This exchange mode can be consumed repeatedly. + * <p>This exchange mode is re-consumable. */ HYBRID_FULL, /** * The consumer can start consuming data anytime as long as the producer has started producing. * - * <p>This exchange mode can not be consumed repeatedly. + * <p>This exchange mode is not re-consumable. */ HYBRID_SELECTIVE,