Repository: beam Updated Branches: refs/heads/master 0064fb37a -> 01408c864
Fix tests that passed invalid input to DynamicDestinations Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12c277f3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12c277f3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12c277f3 Branch: refs/heads/master Commit: 12c277f31b2b8a295e0a41cab3290943a3eff7cd Parents: efe2dc1 Author: Kenneth Knowles <k...@google.com> Authored: Fri Jul 21 20:32:20 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Jul 24 18:53:25 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformMatchersTest.java | 25 +++++++++++++++++- .../direct/WriteWithShardingFactoryTest.java | 27 +++++++++++++++++++- .../runners/dataflow/DataflowRunnerTest.java | 24 ++++++++++++++++- .../beam/sdk/io/DynamicFileDestinations.java | 6 ++++- 4 files changed, 78 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 99d3dd1..316645b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Collections; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -62,7 +63,9 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -547,7 +550,8 @@ public class PTransformMatchersTest implements Serializable { WriteFiles<Integer, Void, Integer> write = WriteFiles.to( new FileBasedSink<Integer, Void>( - StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { + StaticValueProvider.of(outputDirectory), + DynamicFileDestinations.constant(new FakeFilenamePolicy())) { @Override public WriteOperation<Integer, Void> createWriteOperation() { return null; @@ -580,4 +584,23 @@ public class PTransformMatchersTest implements Serializable { write, p); } + + private static class FakeFilenamePolicy extends FilenamePolicy { + @Override + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + FileBasedSink.OutputFileHints outputFileHints) { + throw new UnsupportedOperationException("should not be called"); + } + + @Nullable + @Override + public ResourceId unwindowedFilename( + int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) { + throw new UnsupportedOperationException("should not be called"); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 546a181..6dd069c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -55,7 +56,9 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; @@ -141,7 +144,8 @@ public class WriteWithShardingFactoryTest implements Serializable { PTransform<PCollection<Object>, PDone> original = WriteFiles.to( new FileBasedSink<Object, Void>( - StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { + StaticValueProvider.of(outputDirectory), + DynamicFileDestinations.constant(new FakeFilenamePolicy())) { @Override public WriteOperation<Object, Void> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); @@ -234,4 +238,25 @@ public class WriteWithShardingFactoryTest implements Serializable { List<Integer> shards = fnTester.processBundle((long) count); assertThat(shards, containsInAnyOrder(13)); } + + private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy { + @Override + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + FileBasedSink.OutputFileHints outputFileHints) { + throw new IllegalArgumentException("Should not be used"); + } + + @Nullable + @Override + public ResourceId unwindowedFilename( + int shardNumber, + int numShards, + FileBasedSink.OutputFileHints outputFileHints) { + throw new IllegalArgumentException("Should not be used"); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 94985f8..7556a28 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -64,6 +64,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -81,6 +82,7 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -102,6 +104,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.GcsUtil; @@ -1293,7 +1297,25 @@ public class DataflowRunnerTest implements Serializable { TestSink(String tmpFolder) { super( StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)), - DynamicFileDestinations.constant(null)); + DynamicFileDestinations.constant( + new FilenamePolicy() { + @Override + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + throw new UnsupportedOperationException("should not be called"); + } + + @Nullable + @Override + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { + throw new UnsupportedOperationException("should not be called"); + } + })); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java index e7ef0f6..d05a01a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java @@ -18,6 +18,9 @@ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; @@ -34,7 +37,7 @@ public class DynamicFileDestinations { private final FilenamePolicy filenamePolicy; public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) { - this.filenamePolicy = filenamePolicy; + this.filenamePolicy = checkNotNull(filenamePolicy); } @Override @@ -59,6 +62,7 @@ public class DynamicFileDestinations { @Override public void populateDisplayData(DisplayData.Builder builder) { + checkState(filenamePolicy != null); filenamePolicy.populateDisplayData(builder); } }