This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e096daf  [BEAM-6721] Set numShards dynamically for TextIO.write()
     new 84e24ea  Merge pull request #15500 from [BEAM-6721] Set numShards 
dynamically for TextIO.write()
e096daf is described below

commit e096daf9747a4837d8c054dcb384cf8d5c48023c
Author: Minbo Bae <baemi...@google.com>
AuthorDate: Fri Sep 10 01:08:04 2021 -0700

    [BEAM-6721] Set numShards dynamically for TextIO.write()
---
 .../main/java/org/apache/beam/sdk/io/TextIO.java   | 25 ++++++++++++++++------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    |  3 ++-
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index baae960..8f8a699 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -268,7 +268,6 @@ public class TextIO {
         .setDelimiter(new char[] {'\n'})
         
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
-        .setNumShards(0)
         .setNoSpilling(false)
         .build();
   }
@@ -623,7 +622,7 @@ public class TextIO {
     abstract @Nullable String getFooter();
 
     /** Requested number of shards. 0 for automatic. */
-    abstract int getNumShards();
+    abstract @Nullable ValueProvider<Integer> getNumShards();
 
     /** The shard template of each file written, combined with prefix and 
suffix. */
     abstract @Nullable String getShardTemplate();
@@ -689,7 +688,8 @@ public class TextIO {
       abstract Builder<UserT, DestinationT> setFormatFunction(
           @Nullable SerializableFunction<UserT, String> formatFunction);
 
-      abstract Builder<UserT, DestinationT> setNumShards(int numShards);
+      abstract Builder<UserT, DestinationT> setNumShards(
+          @Nullable ValueProvider<Integer> numShards);
 
       abstract Builder<UserT, DestinationT> setWindowedWrites(boolean 
windowedWrites);
 
@@ -846,6 +846,14 @@ public class TextIO {
      */
     public TypedWrite<UserT, DestinationT> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
+      return withNumShards(StaticValueProvider.of(numShards));
+    }
+
+    /**
+     * Like {@link #withNumShards(int)}. Specifying {@code null} means 
runner-determined sharding.
+     */
+    public TypedWrite<UserT, DestinationT> withNumShards(
+        @Nullable ValueProvider<Integer> numShards) {
       return toBuilder().setNumShards(numShards).build();
     }
 
@@ -1002,7 +1010,7 @@ public class TextIO {
                   getHeader(),
                   getFooter(),
                   getWritableByteChannelFactory()));
-      if (getNumShards() > 0) {
+      if (getNumShards() != null) {
         write = write.withNumShards(getNumShards());
       }
       if (getWindowedWrites()) {
@@ -1020,8 +1028,8 @@ public class TextIO {
 
       resolveDynamicDestinations().populateDisplayData(builder);
       builder
-          .addIfNotDefault(
-              DisplayData.item("numShards", getNumShards()).withLabel("Maximum 
Output Shards"), 0)
+          .addIfNotNull(
+              DisplayData.item("numShards", getNumShards()).withLabel("Maximum 
Output Shards"))
           .addIfNotNull(
               DisplayData.item("tempDirectory", getTempDirectory())
                   .withLabel("Directory for temporary files"))
@@ -1139,6 +1147,11 @@ public class TextIO {
       return new Write(inner.withNumShards(numShards));
     }
 
+    /** See {@link TypedWrite#withNumShards(ValueProvider)}. */
+    public Write withNumShards(@Nullable ValueProvider<Integer> numShards) {
+      return new Write(inner.withNumShards(numShards));
+    }
+
     /** See {@link TypedWrite#withoutSharding()}. */
     public Write withoutSharding() {
       return new Write(inner.withoutSharding());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 9dabb40..7afac9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -884,7 +884,8 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
         shardCount = context.sideInput(numShardsView);
       } else {
         checkNotNull(getNumShardsProvider());
-        shardCount = getNumShardsProvider().get();
+        shardCount =
+            checkNotNull(getNumShardsProvider().get(), "Must have non-null 
number of shards.");
       }
       checkArgument(
           shardCount > 0,

Reply via email to