http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java index c317182..fa95477 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -26,9 +26,9 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.functions.RuntimeContext; /**
http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 9f000e0..6517bf2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -40,10 +40,10 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java index 7ee2f69..942bf42 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java @@ -34,8 +34,8 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index 4aacb4a..fb4c678 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -31,8 +31,8 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.util.Collector; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 5287b85..1844d6d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -67,9 +67,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 1887a99..7d54cfa 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -41,9 +41,9 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 3899303..bf64ede 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -36,10 +36,10 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 7519dbf..8382a2d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -53,9 +53,9 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 80dfa07..79bc0e0 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -54,10 +54,10 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java index ffd56c9..572b005 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; /** * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)} http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 119c9c9..13ae9ee 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -38,13 +38,13 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index b85b315..1ff8a3f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -78,7 +78,6 @@ import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -87,6 +86,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Dataflow batch overrides for {@link CreatePCollectionView}, specialized for different view types. http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index e727433..92f9c98 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -91,7 +91,6 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -100,6 +99,7 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1a806b9..d5e650e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -123,7 +123,6 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -133,6 +132,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java index aa9d9f8..d33fdfe 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java @@ -31,9 +31,9 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentityWindowFn; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.ReshuffleTrigger; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 55c62ae..bd2742f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -21,9 +21,9 @@ import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Wrapper class holding the necessary information to serialize a {@link DoFn}. http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 986ed5c..48cb79f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -98,7 +98,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -106,6 +105,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 5aebf29..3d47726 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -86,10 +86,10 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index c9c7806..fc2ff3d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -33,11 +33,11 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java index 4558683..f0dbd61 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java @@ -29,12 +29,12 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index f2e0bb3..fdcea99 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -31,10 +31,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 4a2851d..9bc8760 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -50,9 +50,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index 6a67cce..d19094d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 410b7de..9bfd2fa 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -33,9 +33,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index 7d06d6b..df633b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -37,10 +37,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index 7d026c6..d0e9038 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -33,9 +33,9 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 9ee52de..a70885b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -37,9 +37,9 @@ import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index 58db8e4..7ac8e7d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -33,9 +33,9 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java index 30b51e6..0ecfa75 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java @@ -20,8 +20,8 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Get RDD storage level for the input PCollection (mostly used for testing purpose). http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index d249e78..77d2c0e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -57,12 +57,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 7b6f9ed..f462e60 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -38,10 +38,10 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 9af4af2..08f0e17 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -74,12 +74,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java index 7580da7..299f5ba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.spark.util; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.WindowingStrategy; /** * A {@link PTransform} wrapping another transform. http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index d6e1a94..e876e12 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -27,10 +27,10 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 0e269a2..dbd9b86 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -23,10 +23,10 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java index 9efac69..df42428 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; public class TimerSpecs { public static TimerSpec timer(TimeDomain timeDomain) { - return new org.apache.beam.sdk.util.AutoValue_TimerSpecs_SimpleTimerSpec(timeDomain); + return new AutoValue_TimerSpecs_SimpleTimerSpec(timeDomain); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 85b8c5f..b5d7db5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -65,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -73,6 +72,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index a648767..446c8a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -29,11 +29,11 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 9d0e97a..1be948f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.NameUtils.NameOverride; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -69,6 +68,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; /** * {@code PTransform}s for combining {@code PCollection} elements http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 7b282b5..25d9c05 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -21,10 +21,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableLikeCoder; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.WindowingStrategy; /** * {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index 6a89c5f..7516b25 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -28,10 +28,10 @@ import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.WindowingStrategy; /** * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>}, http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index d010d1f..2777191 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -30,10 +30,10 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.joda.time.Duration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java index b16aadc..71ebba5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.WindowingStrategy; /** * A {@link GlobalCombineFn} with a fixed accumulator coder. This is created from a http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index d5f2181..a61e3a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java index f2052ac..a07bc5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Implementations of {@link PCollectionView} shared across the SDK. http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 706e039..887f011 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java deleted file mode 100644 index 14f818a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import java.io.Serializable; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.joda.time.Duration; - -/** - * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values. - * It has both a {@link WindowFn} describing how elements are assigned to windows and a - * {@link Trigger} that controls when output is produced for each window. - * - * @param <T> type of elements being windowed - * @param <W> {@link BoundedWindow} subclass used to represent the - * windows used by this {@code WindowingStrategy} - */ -public class WindowingStrategy<T, W extends BoundedWindow> implements Serializable { - - /** - * The accumulation modes that can be used with windowing. - */ - public enum AccumulationMode { - DISCARDING_FIRED_PANES, - ACCUMULATING_FIRED_PANES - } - - private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO; - private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows()); - - private final WindowFn<T, W> windowFn; - private final Trigger trigger; - private final AccumulationMode mode; - private final Duration allowedLateness; - private final ClosingBehavior closingBehavior; - private final TimestampCombiner timestampCombiner; - private final boolean triggerSpecified; - private final boolean modeSpecified; - private final boolean allowedLatenessSpecified; - private final boolean timestampCombinerSpecified; - - private WindowingStrategy( - WindowFn<T, W> windowFn, - Trigger trigger, boolean triggerSpecified, - AccumulationMode mode, boolean modeSpecified, - Duration allowedLateness, boolean allowedLatenessSpecified, - TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, - ClosingBehavior closingBehavior) { - this.windowFn = windowFn; - this.trigger = trigger; - this.triggerSpecified = triggerSpecified; - this.mode = mode; - this.modeSpecified = modeSpecified; - this.allowedLateness = allowedLateness; - this.allowedLatenessSpecified = allowedLatenessSpecified; - this.closingBehavior = closingBehavior; - this.timestampCombiner = timestampCombiner; - this.timestampCombinerSpecified = timestampCombinerSpecified; - } - - /** - * Return a fully specified, default windowing strategy. - */ - public static WindowingStrategy<Object, GlobalWindow> globalDefault() { - return DEFAULT; - } - - public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of( - WindowFn<T, W> windowFn) { - return new WindowingStrategy<>(windowFn, - DefaultTrigger.of(), false, - AccumulationMode.DISCARDING_FIRED_PANES, false, - DEFAULT_ALLOWED_LATENESS, false, - TimestampCombiner.END_OF_WINDOW, false, - ClosingBehavior.FIRE_IF_NON_EMPTY); - } - - public WindowFn<T, W> getWindowFn() { - return windowFn; - } - - public Trigger getTrigger() { - return trigger; - } - - public boolean isTriggerSpecified() { - return triggerSpecified; - } - - public Duration getAllowedLateness() { - return allowedLateness; - } - - public boolean isAllowedLatenessSpecified() { - return allowedLatenessSpecified; - } - - public AccumulationMode getMode() { - return mode; - } - - public boolean isModeSpecified() { - return modeSpecified; - } - - public ClosingBehavior getClosingBehavior() { - return closingBehavior; - } - - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; - } - - public boolean isTimestampCombinerSpecified() { - return timestampCombinerSpecified; - } - - /** - * Returns a {@link WindowingStrategy} identical to {@code this} but with the trigger set to - * {@code wildcardTrigger}. - */ - public WindowingStrategy<T, W> withTrigger(Trigger trigger) { - return new WindowingStrategy<T, W>( - windowFn, - trigger, true, - mode, modeSpecified, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, - closingBehavior); - } - - /** - * Returns a {@link WindowingStrategy} identical to {@code this} but with the accumulation mode - * set to {@code mode}. - */ - public WindowingStrategy<T, W> withMode(AccumulationMode mode) { - return new WindowingStrategy<T, W>( - windowFn, - trigger, triggerSpecified, - mode, true, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, - closingBehavior); - } - - /** - * Returns a {@link WindowingStrategy} identical to {@code this} but with the window function - * set to {@code wildcardWindowFn}. - */ - public WindowingStrategy<T, W> withWindowFn(WindowFn<?, ?> wildcardWindowFn) { - @SuppressWarnings("unchecked") - WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn; - - return new WindowingStrategy<T, W>( - typedWindowFn, - trigger, triggerSpecified, - mode, modeSpecified, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, - closingBehavior); - } - - /** - * Returns a {@link WindowingStrategy} identical to {@code this} but with the allowed lateness - * set to {@code allowedLateness}. - */ - public WindowingStrategy<T, W> withAllowedLateness(Duration allowedLateness) { - return new WindowingStrategy<T, W>( - windowFn, - trigger, triggerSpecified, - mode, modeSpecified, - allowedLateness, true, - timestampCombiner, timestampCombinerSpecified, - closingBehavior); - } - - public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) { - return new WindowingStrategy<T, W>( - windowFn, - trigger, triggerSpecified, - mode, modeSpecified, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, - closingBehavior); - } - - @Experimental(Experimental.Kind.OUTPUT_TIME) - public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) { - - return new WindowingStrategy<T, W>( - windowFn, - trigger, triggerSpecified, - mode, modeSpecified, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, true, - closingBehavior); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("windowFn", windowFn) - .add("allowedLateness", allowedLateness) - .add("trigger", trigger) - .add("accumulationMode", mode) - .add("timestampCombiner", timestampCombiner) - .toString(); - } - - @Override - public boolean equals(Object object) { - if (!(object instanceof WindowingStrategy)) { - return false; - } - WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object; - return isTriggerSpecified() == other.isTriggerSpecified() - && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified() - && isModeSpecified() == other.isModeSpecified() - && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified() - && getMode().equals(other.getMode()) - && getAllowedLateness().equals(other.getAllowedLateness()) - && getClosingBehavior().equals(other.getClosingBehavior()) - && getTrigger().equals(other.getTrigger()) - && getTimestampCombiner().equals(other.getTimestampCombiner()) - && getWindowFn().equals(other.getWindowFn()); - } - - @Override - public int hashCode() { - return Objects.hash( - triggerSpecified, - allowedLatenessSpecified, - modeSpecified, - timestampCombinerSpecified, - mode, - allowedLateness, - closingBehavior, - trigger, - timestampCombiner, - windowFn); - } - - /** - * Fixes all the defaults so that equals can be used to check that two strategies are the same, - * regardless of the state of "defaulted-ness". - */ - @VisibleForTesting - public WindowingStrategy<T, W> fixDefaults() { - return new WindowingStrategy<>( - windowFn, - trigger, true, - mode, true, - allowedLateness, true, - timestampCombiner, true, - closingBehavior); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index 1622322..1095fb8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; /** * A {@link PCollection PCollection<T>} is an immutable collection of values of type http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java index 5027df6..d1bb6d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java @@ -25,7 +25,6 @@ import java.util.Objects; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection.IsBounded; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java index 71efc09..f89041a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; /** * A {@link PCollectionView PCollectionView<T>} is an immutable view of a {@link PCollection} http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java new file mode 100644 index 0000000..8a773e2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.joda.time.Duration; + +/** + * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values. + * It has both a {@link WindowFn} describing how elements are assigned to windows and a + * {@link Trigger} that controls when output is produced for each window. + * + * @param <T> type of elements being windowed + * @param <W> {@link BoundedWindow} subclass used to represent the + * windows used by this {@code WindowingStrategy} + */ +public class WindowingStrategy<T, W extends BoundedWindow> implements Serializable { + + /** + * The accumulation modes that can be used with windowing. + */ + public enum AccumulationMode { + DISCARDING_FIRED_PANES, + ACCUMULATING_FIRED_PANES + } + + private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO; + private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows()); + + private final WindowFn<T, W> windowFn; + private final Trigger trigger; + private final AccumulationMode mode; + private final Duration allowedLateness; + private final ClosingBehavior closingBehavior; + private final TimestampCombiner timestampCombiner; + private final boolean triggerSpecified; + private final boolean modeSpecified; + private final boolean allowedLatenessSpecified; + private final boolean timestampCombinerSpecified; + + private WindowingStrategy( + WindowFn<T, W> windowFn, + Trigger trigger, boolean triggerSpecified, + AccumulationMode mode, boolean modeSpecified, + Duration allowedLateness, boolean allowedLatenessSpecified, + TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, + ClosingBehavior closingBehavior) { + this.windowFn = windowFn; + this.trigger = trigger; + this.triggerSpecified = triggerSpecified; + this.mode = mode; + this.modeSpecified = modeSpecified; + this.allowedLateness = allowedLateness; + this.allowedLatenessSpecified = allowedLatenessSpecified; + this.closingBehavior = closingBehavior; + this.timestampCombiner = timestampCombiner; + this.timestampCombinerSpecified = timestampCombinerSpecified; + } + + /** + * Return a fully specified, default windowing strategy. + */ + public static WindowingStrategy<Object, GlobalWindow> globalDefault() { + return DEFAULT; + } + + public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of( + WindowFn<T, W> windowFn) { + return new WindowingStrategy<>(windowFn, + DefaultTrigger.of(), false, + AccumulationMode.DISCARDING_FIRED_PANES, false, + DEFAULT_ALLOWED_LATENESS, false, + TimestampCombiner.END_OF_WINDOW, false, + ClosingBehavior.FIRE_IF_NON_EMPTY); + } + + public WindowFn<T, W> getWindowFn() { + return windowFn; + } + + public Trigger getTrigger() { + return trigger; + } + + public boolean isTriggerSpecified() { + return triggerSpecified; + } + + public Duration getAllowedLateness() { + return allowedLateness; + } + + public boolean isAllowedLatenessSpecified() { + return allowedLatenessSpecified; + } + + public AccumulationMode getMode() { + return mode; + } + + public boolean isModeSpecified() { + return modeSpecified; + } + + public ClosingBehavior getClosingBehavior() { + return closingBehavior; + } + + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; + } + + public boolean isTimestampCombinerSpecified() { + return timestampCombinerSpecified; + } + + /** + * Returns a {@link WindowingStrategy} identical to {@code this} but with the trigger set to + * {@code wildcardTrigger}. + */ + public WindowingStrategy<T, W> withTrigger(Trigger trigger) { + return new WindowingStrategy<T, W>( + windowFn, + trigger, true, + mode, modeSpecified, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, timestampCombinerSpecified, + closingBehavior); + } + + /** + * Returns a {@link WindowingStrategy} identical to {@code this} but with the accumulation mode + * set to {@code mode}. + */ + public WindowingStrategy<T, W> withMode(AccumulationMode mode) { + return new WindowingStrategy<T, W>( + windowFn, + trigger, triggerSpecified, + mode, true, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, timestampCombinerSpecified, + closingBehavior); + } + + /** + * Returns a {@link WindowingStrategy} identical to {@code this} but with the window function + * set to {@code wildcardWindowFn}. + */ + public WindowingStrategy<T, W> withWindowFn(WindowFn<?, ?> wildcardWindowFn) { + @SuppressWarnings("unchecked") + WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn; + + return new WindowingStrategy<T, W>( + typedWindowFn, + trigger, triggerSpecified, + mode, modeSpecified, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, timestampCombinerSpecified, + closingBehavior); + } + + /** + * Returns a {@link WindowingStrategy} identical to {@code this} but with the allowed lateness + * set to {@code allowedLateness}. + */ + public WindowingStrategy<T, W> withAllowedLateness(Duration allowedLateness) { + return new WindowingStrategy<T, W>( + windowFn, + trigger, triggerSpecified, + mode, modeSpecified, + allowedLateness, true, + timestampCombiner, timestampCombinerSpecified, + closingBehavior); + } + + public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) { + return new WindowingStrategy<T, W>( + windowFn, + trigger, triggerSpecified, + mode, modeSpecified, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, timestampCombinerSpecified, + closingBehavior); + } + + @Experimental(Experimental.Kind.OUTPUT_TIME) + public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) { + + return new WindowingStrategy<T, W>( + windowFn, + trigger, triggerSpecified, + mode, modeSpecified, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, true, + closingBehavior); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("windowFn", windowFn) + .add("allowedLateness", allowedLateness) + .add("trigger", trigger) + .add("accumulationMode", mode) + .add("timestampCombiner", timestampCombiner) + .toString(); + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof WindowingStrategy)) { + return false; + } + WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object; + return isTriggerSpecified() == other.isTriggerSpecified() + && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified() + && isModeSpecified() == other.isModeSpecified() + && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified() + && getMode().equals(other.getMode()) + && getAllowedLateness().equals(other.getAllowedLateness()) + && getClosingBehavior().equals(other.getClosingBehavior()) + && getTrigger().equals(other.getTrigger()) + && getTimestampCombiner().equals(other.getTimestampCombiner()) + && getWindowFn().equals(other.getWindowFn()); + } + + @Override + public int hashCode() { + return Objects.hash( + triggerSpecified, + allowedLatenessSpecified, + modeSpecified, + timestampCombinerSpecified, + mode, + allowedLateness, + closingBehavior, + trigger, + timestampCombiner, + windowFn); + } + + /** + * Fixes all the defaults so that equals can be used to check that two strategies are the same, + * regardless of the state of "defaulted-ness". + */ + @VisibleForTesting + public WindowingStrategy<T, W> fixDefaults() { + return new WindowingStrategy<>( + windowFn, + trigger, true, + mode, true, + allowedLateness, true, + timestampCombiner, true, + closingBehavior); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index e495758..125e159 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.ParDo.SingleOutput; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -57,6 +56,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 6c3aba2..e7b680a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -40,11 +40,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sample; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.WindowingStrategy; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index 5f71cab..adf27f8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -35,11 +35,11 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index a980f87..f74d673 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -35,11 +35,11 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 0556199..50e9c1d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -55,12 +55,12 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index b3fa2c6..e72c540 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -55,13 +55,13 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b61e5bb2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 534e230..92f6a9c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -52,11 +52,11 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant;