Repository: beam
Updated Branches:
  refs/heads/master 0064fb37a -> 01408c864


Fix tests that passed invalid input to DynamicDestinations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12c277f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12c277f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12c277f3

Branch: refs/heads/master
Commit: 12c277f31b2b8a295e0a41cab3290943a3eff7cd
Parents: efe2dc1
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Jul 21 20:32:20 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Jul 24 18:53:25 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    | 25 +++++++++++++++++-
 .../direct/WriteWithShardingFactoryTest.java    | 27 +++++++++++++++++++-
 .../runners/dataflow/DataflowRunnerTest.java    | 24 ++++++++++++++++-
 .../beam/sdk/io/DynamicFileDestinations.java    |  6 ++++-
 4 files changed, 78 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 99d3dd1..316645b 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -62,7 +63,9 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -547,7 +550,8 @@ public class PTransformMatchersTest implements Serializable 
{
     WriteFiles<Integer, Void, Integer> write =
         WriteFiles.to(
             new FileBasedSink<Integer, Void>(
-                StaticValueProvider.of(outputDirectory), 
DynamicFileDestinations.constant(null)) {
+                StaticValueProvider.of(outputDirectory),
+                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
               public WriteOperation<Integer, Void> createWriteOperation() {
                 return null;
@@ -580,4 +584,23 @@ public class PTransformMatchersTest implements 
Serializable {
         write,
         p);
   }
+
+  private static class FakeFilenamePolicy extends FilenamePolicy {
+    @Override
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+
+    @Nullable
+    @Override
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, FileBasedSink.OutputFileHints 
outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 546a181..6dd069c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import javax.annotation.Nullable;
 import 
org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -55,7 +56,9 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -141,7 +144,8 @@ public class WriteWithShardingFactoryTest implements 
Serializable {
     PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
             new FileBasedSink<Object, Void>(
-                StaticValueProvider.of(outputDirectory), 
DynamicFileDestinations.constant(null)) {
+                StaticValueProvider.of(outputDirectory),
+                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
               public WriteOperation<Object, Void> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
@@ -234,4 +238,25 @@ public class WriteWithShardingFactoryTest implements 
Serializable {
     List<Integer> shards = fnTester.processBundle((long) count);
     assertThat(shards, containsInAnyOrder(13));
   }
+
+  private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy 
{
+    @Override
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new IllegalArgumentException("Should not be used");
+    }
+
+    @Nullable
+    @Override
+    public ResourceId unwindowedFilename(
+        int shardNumber,
+        int numShards,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new IllegalArgumentException("Should not be used");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 94985f8..7556a28 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -64,6 +64,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import 
org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -81,6 +82,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -102,6 +104,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -1293,7 +1297,25 @@ public class DataflowRunnerTest implements Serializable {
     TestSink(String tmpFolder) {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, 
true)),
-          DynamicFileDestinations.constant(null));
+          DynamicFileDestinations.constant(
+              new FilenamePolicy() {
+                @Override
+                public ResourceId windowedFilename(
+                    int shardNumber,
+                    int numShards,
+                    BoundedWindow window,
+                    PaneInfo paneInfo,
+                    OutputFileHints outputFileHints) {
+                  throw new UnsupportedOperationException("should not be 
called");
+                }
+
+                @Nullable
+                @Override
+                public ResourceId unwindowedFilename(
+                    int shardNumber, int numShards, OutputFileHints 
outputFileHints) {
+                  throw new UnsupportedOperationException("should not be 
called");
+                }
+              }));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index e7ef0f6..d05a01a7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
@@ -34,7 +37,7 @@ public class DynamicFileDestinations {
     private final FilenamePolicy filenamePolicy;
 
     public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
-      this.filenamePolicy = filenamePolicy;
+      this.filenamePolicy = checkNotNull(filenamePolicy);
     }
 
     @Override
@@ -59,6 +62,7 @@ public class DynamicFileDestinations {
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
+      checkState(filenamePolicy != null);
       filenamePolicy.populateDisplayData(builder);
     }
   }

Reply via email to