This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e096daf [BEAM-6721] Set numShards dynamically for TextIO.write() new 84e24ea Merge pull request #15500 from [BEAM-6721] Set numShards dynamically for TextIO.write() e096daf is described below commit e096daf9747a4837d8c054dcb384cf8d5c48023c Author: Minbo Bae <baemi...@google.com> AuthorDate: Fri Sep 10 01:08:04 2021 -0700 [BEAM-6721] Set numShards dynamically for TextIO.write() --- .../main/java/org/apache/beam/sdk/io/TextIO.java | 25 ++++++++++++++++------ .../java/org/apache/beam/sdk/io/WriteFiles.java | 3 ++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index baae960..8f8a699 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -268,7 +268,6 @@ public class TextIO { .setDelimiter(new char[] {'\n'}) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) - .setNumShards(0) .setNoSpilling(false) .build(); } @@ -623,7 +622,7 @@ public class TextIO { abstract @Nullable String getFooter(); /** Requested number of shards. 0 for automatic. */ - abstract int getNumShards(); + abstract @Nullable ValueProvider<Integer> getNumShards(); /** The shard template of each file written, combined with prefix and suffix. */ abstract @Nullable String getShardTemplate(); @@ -689,7 +688,8 @@ public class TextIO { abstract Builder<UserT, DestinationT> setFormatFunction( @Nullable SerializableFunction<UserT, String> formatFunction); - abstract Builder<UserT, DestinationT> setNumShards(int numShards); + abstract Builder<UserT, DestinationT> setNumShards( + @Nullable ValueProvider<Integer> numShards); abstract Builder<UserT, DestinationT> setWindowedWrites(boolean windowedWrites); @@ -846,6 +846,14 @@ public class TextIO { */ public TypedWrite<UserT, DestinationT> withNumShards(int numShards) { checkArgument(numShards >= 0); + return withNumShards(StaticValueProvider.of(numShards)); + } + + /** + * Like {@link #withNumShards(int)}. Specifying {@code null} means runner-determined sharding. + */ + public TypedWrite<UserT, DestinationT> withNumShards( + @Nullable ValueProvider<Integer> numShards) { return toBuilder().setNumShards(numShards).build(); } @@ -1002,7 +1010,7 @@ public class TextIO { getHeader(), getFooter(), getWritableByteChannelFactory())); - if (getNumShards() > 0) { + if (getNumShards() != null) { write = write.withNumShards(getNumShards()); } if (getWindowedWrites()) { @@ -1020,8 +1028,8 @@ public class TextIO { resolveDynamicDestinations().populateDisplayData(builder); builder - .addIfNotDefault( - DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) + .addIfNotNull( + DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards")) .addIfNotNull( DisplayData.item("tempDirectory", getTempDirectory()) .withLabel("Directory for temporary files")) @@ -1139,6 +1147,11 @@ public class TextIO { return new Write(inner.withNumShards(numShards)); } + /** See {@link TypedWrite#withNumShards(ValueProvider)}. */ + public Write withNumShards(@Nullable ValueProvider<Integer> numShards) { + return new Write(inner.withNumShards(numShards)); + } + /** See {@link TypedWrite#withoutSharding()}. */ public Write withoutSharding() { return new Write(inner.withoutSharding()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 9dabb40..7afac9a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -884,7 +884,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> shardCount = context.sideInput(numShardsView); } else { checkNotNull(getNumShardsProvider()); - shardCount = getNumShardsProvider().get(); + shardCount = + checkNotNull(getNumShardsProvider().get(), "Must have non-null number of shards."); } checkArgument( shardCount > 0,