This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6b3ce095b0a Reduce the maximum size of input splits in Flink to better distribute work (#28045) 6b3ce095b0a is described below commit 6b3ce095b0a12a8385eacdb1748fb523247850c1 Author: Julien Tournay <boudhe...@gmail.com> AuthorDate: Fri Sep 1 22:48:25 2023 +0200 Reduce the maximum size of input splits in Flink to better distribute work (#28045) --- .../beam/runners/flink/FlinkPipelineOptions.java | 7 +++++++ .../translation/wrappers/SourceInputFormat.java | 22 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 2f32ab2c2ea..1e01514fe8b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,6 +313,13 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description( + "Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.") + @Default.Long(0) + Long getFileInputSplitMaxSizeMB(); + + void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 7e1835eac72..a1b8bced7a1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -20,9 +20,11 @@ package org.apache.beam.runners.flink.translation.wrappers; import java.io.IOException; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -108,12 +110,30 @@ public class SourceInputFormat<T> extends RichInputFormat<WindowedValue<T>, Sour return null; } + private long getDesiredSizeBytes(int numSplits) throws Exception { + long totalSize = initialSource.getEstimatedSizeBytes(options); + long defaultSplitSize = totalSize / numSplits; + long maxSplitSize = 0; + if (options != null) { + maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + } + if (initialSource instanceof FileBasedSource && maxSplitSize > 0) { + // Most of the time parallelism is < number of files in source. + // Each file becomes a unique split which commonly create skew. + // This limits the size of splits to reduce skew. + return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024); + } else { + return defaultSplitSize; + } + } + @Override @SuppressWarnings("unchecked") public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { try { - long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + long desiredSizeBytes = getDesiredSizeBytes(numSplits); List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, options); + int numShards = shards.size(); SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards]; for (int i = 0; i < numShards; i++) {