This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 11631cb5956 [FLINK-34145][connector/filesystem] support dynamic source parallelism inference in batch jobs 11631cb5956 is described below commit 11631cb59568df60d40933fb13c8433062ed9290 Author: sunxia <xingbe...@gmail.com> AuthorDate: Wed Jan 24 14:26:03 2024 +0800 [FLINK-34145][connector/filesystem] support dynamic source parallelism inference in batch jobs This closes #24186. --- .../connector/file/src/AbstractFileSource.java | 6 ++- .../flink/connector/file/src/FileSource.java | 25 +++++++++++- .../file/src/FileSourceTextLinesITCase.java | 46 +++++++++++++++++++++- 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index 6dbed75747b..f4fb463e10e 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -69,7 +69,7 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> private static final long serialVersionUID = 1L; - private final Path[] inputPaths; + final Path[] inputPaths; private final FileEnumerator.Provider enumeratorFactory; @@ -100,6 +100,10 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> // Getters // ------------------------------------------------------------------------ + FileEnumerator.Provider getEnumeratorFactory() { + return enumeratorFactory; + } + public FileSplitAssigner.Provider getAssignerFactory() { return assignerFactory; } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java index da76f790627..7d3f545fc02 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.src; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.DynamicParallelismInference; import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner; import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator; @@ -32,10 +33,13 @@ import org.apache.flink.connector.file.src.reader.StreamFormat; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; +import java.util.Collection; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -93,7 +97,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of the events/records produced by this source. */ @PublicEvolving -public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit> { +public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit> + implements DynamicParallelismInference { private static final long serialVersionUID = 1L; @@ -141,6 +146,24 @@ public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit> return FileSourceSplitSerializer.INSTANCE; } + @Override + public int inferParallelism(Context dynamicParallelismContext) { + FileEnumerator fileEnumerator = getEnumeratorFactory().create(); + + Collection<FileSourceSplit> splits; + try { + splits = + fileEnumerator.enumerateSplits( + inputPaths, + dynamicParallelismContext.getParallelismInferenceUpperBound()); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not enumerate file splits", e); + } + + return Math.min( + splits.size(), dynamicParallelismContext.getParallelismInferenceUpperBound()); + } + // ------------------------------------------------------------------------ // Entry-point Factory Methods // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java index 01cbd8aa9c2..08d53f21426 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java @@ -19,10 +19,15 @@ package org.apache.flink.connector.file.src; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.BatchExecutionOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -66,6 +71,8 @@ class FileSourceTextLinesITCase { private static final int PARALLELISM = 4; + private static final int SOURCE_PARALLELISM_UPPER_BOUND = 8; + @TempDir private static java.nio.file.Path tmpDir; @RegisterExtension @@ -108,9 +115,25 @@ class FileSourceTextLinesITCase { miniCluster -> testBoundedTextFileSource(tmpTestDir, FailoverType.JM, miniCluster)); } + @Test + void testBoundedTextFileSourceWithDynamicParallelismInference( + @TempDir java.nio.file.Path tmpTestDir, @InjectMiniCluster MiniCluster miniCluster) + throws Exception { + testBoundedTextFileSource(tmpTestDir, FailoverType.NONE, miniCluster, true); + } + private void testBoundedTextFileSource( java.nio.file.Path tmpTestDir, FailoverType failoverType, MiniCluster miniCluster) throws Exception { + testBoundedTextFileSource(tmpTestDir, failoverType, miniCluster, false); + } + + private void testBoundedTextFileSource( + java.nio.file.Path tmpTestDir, + FailoverType failoverType, + MiniCluster miniCluster, + boolean batchMode) + throws Exception { final File testDir = tmpTestDir.toFile(); // our main test data @@ -126,11 +149,16 @@ class FileSourceTextLinesITCase { .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.setParallelism(PARALLELISM); + + if (batchMode) { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } final DataStream<String> stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source") + .setMaxParallelism(PARALLELISM * 2); final DataStream<String> streamFailingInTheMiddleOfReading = RecordCounterToFail.wrapWithFailureAfter(stream, LINES.length / 2); @@ -149,6 +177,9 @@ class FileSourceTextLinesITCase { } verifyResult(result); + if (batchMode) { + verifySourceParallelism(miniCluster.getExecutionGraph(jobId).get()); + } } /** @@ -253,11 +284,16 @@ class FileSourceTextLinesITCase { } private static MiniClusterResourceConfiguration createMiniClusterConfiguration() { + Configuration configuration = new Configuration(); + configuration.set( + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, + SOURCE_PARALLELISM_UPPER_BOUND); return new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) .withHaLeadershipControl() + .setConfiguration(configuration) .build(); } @@ -320,6 +356,12 @@ class FileSourceTextLinesITCase { assertThat(actual).isEqualTo(expected); } + private static void verifySourceParallelism(AccessExecutionGraph executionGraph) { + AccessExecutionJobVertex sourceVertex = + executionGraph.getVerticesTopologically().iterator().next(); + assertThat(sourceVertex.getParallelism()).isEqualTo(FILE_PATHS.length); + } + // ------------------------------------------------------------------------ // test data // ------------------------------------------------------------------------