This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5b600da86a21e2c4339261d73fa1c2588cb3ab8d
Author: Eugene Kirpichov <kirpic...@google.com>
AuthorDate: Fri Nov 17 12:38:15 2017 -0800

    Converts WriteFiles to AutoValue
---
 .../core/construction/WriteFilesTranslation.java   |   5 +-
 .../construction/WriteFilesTranslationTest.java    |  12 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |   4 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 187 +++++++++------------
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |  22 +--
 5 files changed, 104 insertions(+), 126 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index d0b2182..a6dd55c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -85,12 +85,13 @@ public class WriteFilesTranslation {
 
           @Override
           public boolean isWindowedWrites() {
-            return transform.isWindowedWrites();
+            return transform.getWindowedWrites();
           }
 
           @Override
           public boolean isRunnerDeterminedSharding() {
-            return transform.getNumShards() == null && transform.getSharding() 
== null;
+            return transform.getNumShardsProvider() == null
+                && transform.getComputeNumShards() == null;
           }
         },
         components);
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index ccb366e..2d45681 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -80,9 +80,11 @@ public class WriteFilesTranslationTest {
 
       assertThat(
           payload.getRunnerDeterminedSharding(),
-          equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+          equalTo(
+              writeFiles.getNumShardsProvider() == null
+                  && writeFiles.getComputeNumShards() == null));
 
-      assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.isWindowedWrites()));
+      assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.getWindowedWrites()));
 
       assertThat(
           (FileBasedSink<String, Void, String>)
@@ -102,11 +104,13 @@ public class WriteFilesTranslationTest {
 
       assertThat(
           WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
-          equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+          equalTo(
+              writeFiles.getNumShardsProvider() == null
+                  && writeFiles.getComputeNumShards() == null));
 
       assertThat(
           WriteFilesTranslation.isWindowedWrites(appliedPTransform),
-          equalTo(writeFiles.isWindowedWrites()));
+          equalTo(writeFiles.getWindowedWrites()));
       assertThat(
           WriteFilesTranslation.<String, Void, 
String>getSink(appliedPTransform),
           equalTo(writeFiles.getSink()));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 5bc84be..20d2a27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -451,8 +451,8 @@ public abstract class FileBasedSink<UserT, DestinationT, 
OutputT>
    * written,
    *
    * <ol>
-   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the 
temporary files containing the
-   *       output bundles.
+   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the 
temporary files
+   *       containing the output bundles.
    *   <li>During finalize, these temporary files are copied to final output 
locations and named
    *       according to a file naming template.
    *   <li>Finally, any temporary files that were created during the write are 
removed.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 0a538b1..d6c5788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
@@ -108,7 +109,8 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<UserT, DestinationT, OutputT>
+@AutoValue
+public abstract class WriteFiles<UserT, DestinationT, OutputT>
     extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
@@ -125,19 +127,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<UserT, DestinationT, OutputT> sink;
   private @Nullable WriteOperation<DestinationT, OutputT> writeOperation;
-  // This allows the number of shards to be dynamically computed based on the 
input
-  // PCollection.
-  private final @Nullable PTransform<PCollection<UserT>, 
PCollectionView<Integer>> computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to 
be updatable
-  // when a pipeline is updated.
-  private final @Nullable ValueProvider<Integer> numShardsProvider;
-  private final boolean windowedWrites;
-  private int maxNumWritersPerBundle;
-  // This is the set of side inputs used by this transform. This is usually 
populated by the users's
-  // DynamicDestinations object.
-  private final List<PCollectionView<?>> sideInputs;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link 
FileBasedSink}, letting
@@ -146,57 +136,59 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, 
OutputT> to(
       FileBasedSink<UserT, DestinationT, OutputT> sink) {
     checkArgument(sink != null, "sink can not be null");
-    return new WriteFiles<>(
-        sink,
-        null /* runner-determined sharding */,
-        null,
-        false,
-        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
-        sink.getDynamicDestinations().getSideInputs());
+    return new AutoValue_WriteFiles.Builder<UserT, DestinationT, OutputT>()
+        .setSink(sink)
+        .setComputeNumShards(null)
+        .setNumShardsProvider(null)
+        .setWindowedWrites(false)
+        .setMaxNumWritersPerBundle(DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE)
+        .setSideInputs(sink.getDynamicDestinations().getSideInputs())
+        .build();
   }
 
-  private WriteFiles(
-      FileBasedSink<UserT, DestinationT, OutputT> sink,
-      @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> 
computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites,
-      int maxNumWritersPerBundle,
-      List<PCollectionView<?>> sideInputs) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
-    this.sideInputs = sideInputs;
-  }
+  public abstract FileBasedSink<UserT, DestinationT, OutputT> getSink();
 
-  @Override
-  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-    return PCollectionViews.toAdditionalInputs(sideInputs);
-  }
+  @Nullable
+  public abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> 
getComputeNumShards();
 
-  /** Returns the {@link FileBasedSink} associated with this PTransform. */
-  public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
-    return sink;
-  }
+  // We don't use a side input for static sharding, as we want this value to 
be updatable
+  // when a pipeline is updated.
+  @Nullable
+  public abstract ValueProvider<Integer> getNumShardsProvider();
 
-  /** Returns whether or not to perform windowed writes. */
-  public boolean isWindowedWrites() {
-    return windowedWrites;
-  }
+  public abstract boolean getWindowedWrites();
 
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This 
can be either a
-   * static number of shards (as following a call to {@link 
#withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  public @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> 
getSharding() {
-    return computeNumShards;
+  abstract int getMaxNumWritersPerBundle();
+
+  abstract List<PCollectionView<?>> getSideInputs();
+
+  abstract Builder<UserT, DestinationT, OutputT> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<UserT, DestinationT, OutputT> {
+    abstract Builder<UserT, DestinationT, OutputT> setSink(
+        FileBasedSink<UserT, DestinationT, OutputT> sink);
+
+    abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards(
+        PTransform<PCollection<UserT>, PCollectionView<Integer>> 
computeNumShards);
+
+    abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider(
+        ValueProvider<Integer> numShardsProvider);
+
+    abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean 
windowedWrites);
+
+    abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
+        int maxNumWritersPerBundle);
+
+    abstract Builder<UserT, DestinationT, OutputT> setSideInputs(
+        List<PCollectionView<?>> sideInputs);
+
+    abstract WriteFiles<UserT, DestinationT, OutputT> build();
   }
 
-  public @Nullable ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
+  @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return PCollectionViews.toAdditionalInputs(getSideInputs());
   }
 
   /**
@@ -225,36 +217,18 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    */
   public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
       ValueProvider<Integer> numShardsProvider) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return toBuilder().setNumShardsProvider(numShardsProvider).build();
   }
 
   /** Set the maximum number of writers created in a bundle before spilling to 
shuffle. */
   public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
       int maxNumWritersPerBundle) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
   }
 
   public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
       List<PCollectionView<?>> sideInputs) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return toBuilder().setSideInputs(sideInputs).build();
   }
 
   /**
@@ -268,8 +242,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
     checkArgument(
         sharding != null, "sharding can not be null. Use 
withRunnerDeterminedSharding() instead.");
-    return new WriteFiles<>(
-        sink, sharding, null, windowedWrites, maxNumWritersPerBundle, 
sideInputs);
+    return toBuilder().setComputeNumShards(sharding).build();
   }
 
   /**
@@ -277,7 +250,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * runner-determined sharding.
    */
   public WriteFiles<UserT, DestinationT, OutputT> 
withRunnerDeterminedSharding() {
-    return new WriteFiles<>(sink, null, null, windowedWrites, 
maxNumWritersPerBundle, sideInputs);
+    return 
toBuilder().setComputeNumShards(null).setNumShardsProvider(null).build();
   }
 
   /**
@@ -293,35 +266,34 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * <p>This option can only be used if {@link #withNumShards(int)} is also 
set to a positive value.
    */
   public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
-    return new WriteFiles<>(
-        sink, computeNumShards, numShardsProvider, true, 
maxNumWritersPerBundle, sideInputs);
+    return toBuilder().setWindowedWrites(true).build();
   }
 
   @Override
   public void validate(PipelineOptions options) {
-    sink.validate(options);
+    getSink().validate(options);
   }
 
   @Override
   public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
     if (input.isBounded() == IsBounded.UNBOUNDED) {
       checkArgument(
-          windowedWrites,
+          getWindowedWrites(),
           "Must use windowed writes when applying %s to an unbounded 
PCollection",
           WriteFiles.class.getSimpleName());
     }
-    if (windowedWrites) {
+    if (getWindowedWrites()) {
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
       // and similar behavior in other runners.
       checkArgument(
-          computeNumShards != null || numShardsProvider != null,
+          getComputeNumShards() != null || getNumShardsProvider() != null,
           "When using windowed writes, must specify number of output shards 
explicitly",
           WriteFiles.class.getSimpleName());
     }
-    this.writeOperation = sink.createWriteOperation();
-    this.writeOperation.setWindowedWrites(windowedWrites);
+    this.writeOperation = getSink().createWriteOperation();
+    this.writeOperation.setWindowedWrites(getWindowedWrites());
 
-    if (!windowedWrites) {
+    if (!getWindowedWrites()) {
       // Re-window the data into the global window and remove any existing 
triggers.
       input =
           input.apply(
@@ -347,10 +319,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         FileResultCoder.of(windowCoder, destinationCoder);
 
     PCollectionView<Integer> numShardsView =
-        (computeNumShards == null) ? null : input.apply(computeNumShards);
+        (getComputeNumShards() == null) ? null : 
input.apply(getComputeNumShards());
 
     PCollection<FileResult<DestinationT>> tempFileResults =
-        (computeNumShards == null && numShardsProvider == null)
+        (getComputeNumShards() == null && getNumShardsProvider() == null)
             ? input.apply(
                 "WriteUnshardedBundlesToTempFiles",
                 new WriteUnshardedBundlesToTempFiles(destinationCoder, 
fileResultCoder))
@@ -370,13 +342,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles 
Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
+        .add(DisplayData.item("sink", 
getSink().getClass()).withLabel("WriteFiles Sink"))
+        .include("sink", getSink());
+    if (getComputeNumShards() != null) {
+      builder.include("sharding", getComputeNumShards());
     } else {
       builder.addIfNotNull(
-          DisplayData.item("numShards", getNumShards()).withLabel("Fixed 
Number of Shards"));
+          DisplayData.item("numShards", getNumShardsProvider())
+              .withLabel("Fixed Number of Shards"));
     }
   }
 
@@ -395,7 +368,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @Override
     public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         // Reshuffle the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
@@ -437,7 +410,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               ParDo.of(
                       new WriteUnshardedTempFilesWithSpillingFn(
                           spilledRecordsTag, destinationCoder))
-                  .withSideInputs(sideInputs)
+                  .withSideInputs(getSideInputs())
                   .withOutputTags(writtenRecordsTag, 
TupleTagList.of(spilledRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
@@ -456,7 +429,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
               .apply(
                   "WriteSpilled",
-                  ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+                  ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
               .setCoder(fileResultCoder)
               .apply(
                   "DropShardNum",
@@ -514,7 +487,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), 
destination);
       Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
-        if (writers.size() <= maxNumWritersPerBundle) {
+        if (writers.size() <= getMaxNumWritersPerBundle()) {
           String uuid = UUID.randomUUID().toString();
           LOG.info(
               "Opening writer {} for window {} pane {} destination {}",
@@ -656,7 +629,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
           .apply(
               "WriteShardsIntoTempFiles",
-              ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+              ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
@@ -680,8 +653,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
       } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
+        checkNotNull(getNumShardsProvider());
+        shardCount = getNumShardsProvider().get();
       }
       checkArgument(
           shardCount > 0,
@@ -771,7 +744,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public WriteFilesResult<DestinationT> expand(
         PCollection<Iterable<FileResult<DestinationT>>> input) {
 
-      List<PCollectionView<?>> finalizeSideInputs = 
Lists.newArrayList(sideInputs);
+      List<PCollectionView<?>> finalizeSideInputs = 
Lists.newArrayList(getSideInputs());
       if (numShardsView != null) {
         finalizeSideInputs.add(numShardsView);
       }
@@ -794,10 +767,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         @Nullable Integer fixedNumShards;
         if (numShardsView != null) {
           fixedNumShards = c.sideInput(numShardsView);
-        } else if (numShardsProvider != null) {
-          fixedNumShards = numShardsProvider.get();
+        } else if (getNumShardsProvider() != null) {
+          fixedNumShards = getNumShardsProvider().get();
         } else {
-          checkState(!windowedWrites, "Windowed write should have set fixed 
sharding");
+          checkState(!getWindowedWrites(), "Windowed write should have set 
fixed sharding");
           fixedNumShards = null;
         }
         List<FileResult<DestinationT>> fileResults = 
Lists.newArrayList(c.element());
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 40ae0ea..b68cbf9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -329,21 +329,21 @@ public class WriteFilesTest {
     WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink<Void>) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding 
=
-        write.getSharding();
+        write.getComputeNumShards();
 
-    assertThat(write.getSharding(), is(nullValue()));
-    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
-    assertThat(write.getNumShards().get(), equalTo(3));
-    assertThat(write.getSharding(), equalTo(originalSharding));
+    assertThat(write.getComputeNumShards(), is(nullValue()));
+    assertThat(write.getNumShardsProvider(), 
instanceOf(StaticValueProvider.class));
+    assertThat(write.getNumShardsProvider().get(), equalTo(3));
+    assertThat(write.getComputeNumShards(), equalTo(originalSharding));
 
     WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM);
     assertThat((SimpleSink<Void>) write2.getSink(), is(sink));
-    assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
+    assertThat(write2.getComputeNumShards(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
     WriteFiles<String, ?, ?> writeUnsharded = 
write2.withRunnerDeterminedSharding();
-    assertThat(writeUnsharded.getSharding(), nullValue());
-    assertThat(write.getSharding(), equalTo(originalSharding));
+    assertThat(writeUnsharded.getComputeNumShards(), nullValue());
+    assertThat(write.getComputeNumShards(), equalTo(originalSharding));
   }
 
   @Test
@@ -669,10 +669,10 @@ public class WriteFilesTest {
     p.run();
 
     Optional<Integer> numShards =
-        (write.getNumShards() != null && !write.isWindowedWrites())
-            ? Optional.of(write.getNumShards().get())
+        (write.getNumShardsProvider() != null && !write.getWindowedWrites())
+            ? Optional.of(write.getNumShardsProvider().get())
             : Optional.<Integer>absent();
-    checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites());
+    checkFileContents(baseName, inputs, numShards, !write.getWindowedWrites());
   }
 
   static void checkFileContents(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to