This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 25fdf5490db301b33157c99fe830d3497b6d0bdd Author: Yangze Guo <[email protected]> AuthorDate: Mon Jun 28 11:07:52 2021 +0800 [FLINK-21925][core] Introduce fine-grained.shuffle-mode.all-blocking to avoid resource deadlock in batch jobs that apply fine-grained resource management This closes #16307 --- .../apache/flink/configuration/ClusterOptions.java | 8 ++++++++ .../api/environment/StreamExecutionEnvironment.java | 21 +++++++++++++++++++++ .../flink/streaming/api/graph/StreamGraph.java | 5 +++++ 3 files changed, 34 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java index 1bb007b..667dc76 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java @@ -123,6 +123,14 @@ public class ClusterOptions { .withDescription( "Defines whether the cluster uses fine-grained resource management."); + @Documentation.ExcludeFromDocumentation + public static final ConfigOption<Boolean> FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING = + ConfigOptions.key("fine-grained.shuffle-mode.all-blocking") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to convert all PIPELINE edges to BLOCKING when apply fine-grained resource management in batch jobs."); + public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) { if (isAdaptiveSchedulerEnabled(configuration) || isReactiveModeEnabled(configuration)) { return JobManagerOptions.SchedulerType.Adaptive; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index ba5d107..9670b39 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -52,10 +52,12 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; @@ -91,6 +93,7 @@ import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; @@ -2099,6 +2102,24 @@ public class StreamExecutionEnvironment { @Internal public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) { StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(); + + // There might be a resource deadlock when applying fine-grained resource management in + // batch jobs with PIPELINE edges. Users need to trigger the + // fine-grained.shuffle-mode.all-blocking to convert all edges to BLOCKING before we fix + // that issue. + if (configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH + && streamGraph.hasFineGrainedResource()) { + if (configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) { + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); + } else { + throw new IllegalConfigurationException( + "At the moment, fine-grained resource management requires batch workloads to " + + "be executed with types of all edges being BLOCKING. To do that, you need to configure '" + + ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key() + + "' to 'true'. Notice that this may affect the performance. See FLINK-20865 for more details."); + } + } + if (clearTransformations) { this.transformations.clear(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 35f8b45..e3f284f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -253,6 +253,11 @@ public class StreamGraph implements Pipeline { return Optional.ofNullable(slotSharingGroupResources.get(groupId)); } + public boolean hasFineGrainedResource() { + return slotSharingGroupResources.values().stream() + .anyMatch(resourceProfile -> !resourceProfile.equals(ResourceProfile.UNKNOWN)); + } + /** * Set whether to put all vertices into the same slot sharing group by default. *
