Move user-facing timer-related classes out of util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca41af8f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca41af8f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca41af8f Branch: refs/heads/master Commit: ca41af8fe4711ab4a81c2a33746a64e64fb0ca37 Parents: ac01ec7 Author: Kenneth Knowles <k...@google.com> Authored: Tue May 2 12:46:46 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu May 4 16:06:55 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexTimerInternals.java | 2 +- .../operators/ApexTimerInternalsTest.java | 2 +- .../runners/core/construction/Triggers.java | 2 +- .../construction/PTransformMatchersTest.java | 8 +- .../apache/beam/runners/core/DoFnAdapters.java | 2 +- .../apache/beam/runners/core/DoFnRunner.java | 2 +- .../runners/core/InMemoryTimerInternals.java | 2 +- .../core/LateDataDroppingDoFnRunner.java | 2 +- ...eBoundedSplittableProcessElementInvoker.java | 2 +- .../beam/runners/core/ProcessFnRunner.java | 2 +- .../core/PushbackSideInputDoFnRunner.java | 2 +- .../org/apache/beam/runners/core/ReduceFn.java | 2 +- .../runners/core/ReduceFnContextFactory.java | 4 +- .../beam/runners/core/ReduceFnRunner.java | 2 +- .../beam/runners/core/SimpleDoFnRunner.java | 6 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../core/SimplePushbackSideInputDoFnRunner.java | 2 +- .../beam/runners/core/SplittableParDo.java | 2 +- .../beam/runners/core/StatefulDoFnRunner.java | 2 +- .../beam/runners/core/TimerInternals.java | 2 +- .../AfterDelayFromFirstElementStateMachine.java | 2 +- .../AfterProcessingTimeStateMachine.java | 2 +- ...rSynchronizedProcessingTimeStateMachine.java | 2 +- .../triggers/AfterWatermarkStateMachine.java | 2 +- .../triggers/DefaultTriggerStateMachine.java | 2 +- .../core/triggers/TriggerStateMachine.java | 2 +- .../TriggerStateMachineContextFactory.java | 4 +- .../triggers/TriggerStateMachineRunner.java | 2 +- .../core/InMemoryTimerInternalsTest.java | 2 +- .../runners/core/KeyedWorkItemCoderTest.java | 2 +- .../beam/runners/core/ReduceFnRunnerTest.java | 2 +- .../beam/runners/core/ReduceFnTester.java | 2 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 8 +- .../SimplePushbackSideInputDoFnRunnerTest.java | 2 +- .../beam/runners/core/TimerInternalsTest.java | 2 +- .../triggers/TriggerStateMachineTester.java | 4 +- .../core/triggers/TriggerStateMachinesTest.java | 2 +- .../runners/direct/DirectTimerInternals.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 2 +- .../direct/DirectTimerInternalsTest.java | 2 +- ...leManagerRemovingTransformEvaluatorTest.java | 2 +- .../runners/direct/EvaluationContextTest.java | 2 +- .../runners/direct/WatermarkManagerTest.java | 2 +- .../metrics/DoFnRunnerWithMetricsUpdate.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../flink/streaming/DoFnOperatorTest.java | 8 +- .../spark/stateful/SparkTimerInternals.java | 2 +- .../translation/DoFnRunnerWithMetrics.java | 2 +- .../apache/beam/sdk/options/ValueProviders.java | 1 - .../org/apache/beam/sdk/state/TimeDomain.java | 43 +++++++++++ .../java/org/apache/beam/sdk/state/Timer.java | 78 ++++++++++++++++++++ .../org/apache/beam/sdk/state/TimerSpec.java | 30 ++++++++ .../org/apache/beam/sdk/state/TimerSpecs.java | 41 ++++++++++ .../java/org/apache/beam/sdk/state/Timers.java | 58 +++++++++++++++ .../beam/sdk/testing/TestPipelineOptions.java | 2 - .../org/apache/beam/sdk/transforms/DoFn.java | 6 +- .../apache/beam/sdk/transforms/DoFnTester.java | 2 +- .../beam/sdk/transforms/GroupIntoBatches.java | 8 +- .../org/apache/beam/sdk/transforms/Regex.java | 1 - .../reflect/ByteBuddyDoFnInvokerFactory.java | 2 +- .../sdk/transforms/reflect/DoFnInvoker.java | 2 +- .../sdk/transforms/reflect/DoFnSignature.java | 4 +- .../sdk/transforms/reflect/DoFnSignatures.java | 4 +- .../transforms/windowing/AfterWatermark.java | 2 +- .../org/apache/beam/sdk/util/ClassPath.java | 1 + .../org/apache/beam/sdk/util/TimeDomain.java | 43 ----------- .../java/org/apache/beam/sdk/util/Timer.java | 78 -------------------- .../org/apache/beam/sdk/util/TimerSpec.java | 30 -------- .../org/apache/beam/sdk/util/TimerSpecs.java | 41 ---------- .../java/org/apache/beam/sdk/util/Timers.java | 58 --------------- .../sdk/io/DrunkWritableByteChannelFactory.java | 1 - .../org/apache/beam/sdk/io/TFRecordIOTest.java | 2 - .../apache/beam/sdk/metrics/GaugeCellTest.java | 1 + .../beam/sdk/options/PipelineOptionsTest.java | 1 - .../apache/beam/sdk/transforms/ParDoTest.java | 8 +- .../apache/beam/sdk/transforms/RegexTest.java | 2 - .../transforms/reflect/DoFnInvokersTest.java | 8 +- .../transforms/reflect/DoFnSignaturesTest.java | 8 +- .../transforms/reflect/OnTimerInvokersTest.java | 6 +- .../testhelper/DoFnInvokersTestHelper.java | 6 +- 81 files changed, 348 insertions(+), 356 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 4fdb600..6131bdf 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -45,11 +45,11 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java index 15ccbee..1eb224c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java @@ -35,8 +35,8 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java index ee142e2..7b52223 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java @@ -32,8 +32,8 @@ import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java index 81f738d..5e73571 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java @@ -25,6 +25,7 @@ import java.lang.reflect.Method; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.AfterAll; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.TimestampTransform; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ReshuffleTrigger; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 278c12b..ba935a4 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -39,6 +39,10 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -57,10 +61,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 849400f..af59a40 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.beam.runners.core.OldDoFn.Context; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index b29adcc..30648f6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index f8aaa09..e68bb24 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -27,8 +27,8 @@ import com.google.common.collect.Table; import java.util.NavigableSet; import java.util.TreeSet; import javax.annotation.Nullable; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index d6ba8f5..872ee3e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -23,8 +23,8 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 8e80a69..16bdfa3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; import org.apache.beam.sdk.transforms.DoFn.StartBundleContext; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java index 3ae3f50..7cbf0d2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java @@ -25,10 +25,10 @@ import java.util.Collection; import java.util.Collections; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index bab1dc7..8f21086 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java index bcc47a8..8e2ff73 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java @@ -19,9 +19,9 @@ package org.apache.beam.runners.core; import java.io.Serializable; import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index cb1a159..550b9b9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -31,11 +31,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.state.StateContexts; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index d3dc067..a949edd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -45,6 +45,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 200a43b..2567bba 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -34,6 +34,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; @@ -51,9 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 4c3149a..7454dc9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java index 50d301b..36a04d9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java @@ -23,9 +23,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 1fda7d9..74ffeb6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.DoFn; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 0a7701a..3466945 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -23,13 +23,13 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 375cdf9..21fe430 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -30,9 +30,9 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index b972985..8d8d0de 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -32,10 +32,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.Combine.Holder; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java index eaf5613..e813d33 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java @@ -21,8 +21,8 @@ import java.util.List; import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java index 07fab22..e1cd897 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -21,8 +21,8 @@ import com.google.common.base.Objects; import java.util.Collections; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; // This should not really have the superclass https://issues.apache.org/jira/browse/BEAM-1486 http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 1b117d2..c9eee15 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -24,7 +24,7 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; /** * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java index be4dd68..4ab9fc9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.core.triggers; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; /** * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. See http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java index d622ac9..6a2cf0c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java @@ -24,10 +24,10 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.runners.core.MergingStateAccessor; import org.apache.beam.runners.core.StateAccessor; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java index 84b0453..811f30c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -35,10 +35,10 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerI import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timers; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index 324d44d..88ea6ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -29,9 +29,9 @@ import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.coders.BitSetCoder; +import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.Timers; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 0226260..7049421 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index 4d59634..4057c25 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -21,9 +21,9 @@ import com.google.common.collect.ImmutableList; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 7b91151..2b400a4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.Sum; @@ -70,7 +71,6 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 573855f..eea30e9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -64,7 +65,6 @@ import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index b8fc64e..d793584 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -33,16 +33,16 @@ import org.apache.beam.runners.core.BaseExecutionContext.StepContext; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index ba3f926..dabc4f0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index 768c974..af270d9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 1bdcff2..51f467a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -44,12 +44,12 @@ import org.apache.beam.runners.core.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.TestInMemoryStateInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java index 7ff3478..5158f50 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 67d568d..a099368 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -23,7 +23,7 @@ import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index b576e00..21ba734 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -55,10 +55,10 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index c8ac695..f80515f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -28,7 +28,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 1ac4d6d..d98e51d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -29,10 +29,10 @@ import static org.mockito.Mockito.verify; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.hamcrest.Matchers; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 3eff11a..63864f0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -44,6 +44,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index e6506a8..e1e6ab5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -43,6 +43,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -55,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index 29a1a52..dae91fe 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -21,8 +21,8 @@ import java.io.Closeable; import java.io.IOException; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.functions.RuntimeContext; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index e44ad57..5287b85 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -58,6 +58,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGro import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -65,7 +66,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 2a51be6..80dfa07 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -41,6 +41,10 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.PCollectionViewTesting; import org.apache.beam.sdk.transforms.DoFn; @@ -49,10 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index 646e269..107915f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -32,8 +32,8 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.spark.broadcast.Broadcast; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index fa9a9c2..d74b253 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -24,8 +24,8 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.Accumulator; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java index e2355ee..1cc46fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import java.io.IOException; import java.util.Map; import org.apache.beam.sdk.util.common.ReflectHelpers; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java new file mode 100644 index 0000000..e814915 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +/** + * {@code TimeDomain} specifies whether an operation is based on + * timestamps of elements or current "real-world" time as reported while processing. + */ +public enum TimeDomain { + /** + * The {@code EVENT_TIME} domain corresponds to the timestamps on the elements. Time advances + * on the system watermark advances. + */ + EVENT_TIME, + + /** + * The {@code PROCESSING_TIME} domain corresponds to the current to the current (system) time. + * This is advanced during execution of the pipeline. + */ + PROCESSING_TIME, + + /** + * Same as the {@code PROCESSING_TIME} domain, except it won't fire a timer set for time + * {@code T} until all timers from earlier stages set for a time earlier than {@code T} have + * fired. + */ + SYNCHRONIZED_PROCESSING_TIME +} http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java new file mode 100644 index 0000000..9458906 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A timer for a specified time domain that can be set to register the desire for further processing + * at particular time in its specified time domain. + * + * <p>See {@link TimeDomain} for details on the time domains available. + * + * <p>In a {@link DoFn}, a {@link Timer} is specified by a {@link TimerSpec} annotated with {@link + * DoFn.TimerId}. + * + * <p>An implementation of {@link Timer} is implicitly scoped - it may be scoped to a key and + * window, or a key, window, and trigger, etc. + * + * <p>A timer exists in one of two states: set or unset. A timer can be set only for a single time + * per scope. + * + * <p>Timer callbacks are not guaranteed to be called immediately according to the local view of the + * {@link TimeDomain}, but will be called at some time after the requested time, in timestamp + * order. + */ +@Experimental(Experimental.Kind.TIMERS) +public interface Timer { + /** + * Sets or resets the time in the timer's {@link TimeDomain} at which it should fire. If the timer + * was already set, resets it to the new requested time. + * + * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing + * time timers are ignored after a window has expired. Instead, it is recommended to use + * {@link #setRelative()}. + */ + void set(Instant absoluteTime); + + /** + * Unsets this timer. It is permitted to {@code cancel()} whether or not the timer was actually + * set. + */ + void cancel(); + + /** + * Sets the timer relative to the current time, according to any offset and alignment specified. + * Using {@link #offset(Duration)} and {@link #align(Duration)}. + */ + void setRelative(); + + /** + * Set the align offset. + */ + Timer offset(Duration offset); + + /** + * Aligns a timestamp to the next boundary of {@code period}. + */ + Timer align(Duration period); + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java new file mode 100644 index 0000000..f08f43e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A specification for a {@link Timer}. This includes its {@link TimeDomain}. + */ +@Experimental(Kind.TIMERS) +public interface TimerSpec extends Serializable { + TimeDomain getTimeDomain(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java new file mode 100644 index 0000000..9efac69 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Static methods for working with {@link TimerSpec}. + */ +@Experimental(Kind.TIMERS) +public class TimerSpecs { + + public static TimerSpec timer(TimeDomain timeDomain) { + return new org.apache.beam.sdk.util.AutoValue_TimerSpecs_SimpleTimerSpec(timeDomain); + } + + /** + * A straightforward POJO {@link TimerSpec}. Package-level access for AutoValue. + */ + @AutoValue + abstract static class SimpleTimerSpec implements TimerSpec { + public abstract TimeDomain getTimeDomain(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timers.java new file mode 100644 index 0000000..dcdaf00 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timers.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.joda.time.Instant; + +/** + * Interface for interacting with time. + */ +@Experimental(Experimental.Kind.TIMERS) +public interface Timers { + /** + * Sets a timer to fire when the event time watermark, the current processing time, or + * the synchronized processing time watermark surpasses a given timestamp. + * + * <p>See {@link TimeDomain} for details on the time domains available. + * + * <p>Timers are not guaranteed to fire immediately, but will be delivered at some time + * afterwards. + * + * <p>An implementation of {@link Timers} implicitly scopes timers that are set - they may + * be scoped to a key and window, or a key, window, and trigger, etc. + * + * @param timestamp the time at which the timer should be delivered + * @param timeDomain the domain that the {@code timestamp} applies to + */ + void setTimer(Instant timestamp, TimeDomain timeDomain); + + /** Removes the timer set in this context for the {@code timestmap} and {@code timeDomain}. */ + void deleteTimer(Instant timestamp, TimeDomain timeDomain); + + /** Returns the current processing time. */ + Instant currentProcessingTime(); + + /** Returns the current synchronized processing time or {@code null} if unknown. */ + @Nullable + Instant currentSynchronizedProcessingTime(); + + /** Returns the current event time. */ + Instant currentEventTime(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index e1ad47b..206bc1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -18,9 +18,7 @@ package org.apache.beam.sdk.testing; import com.fasterxml.jackson.annotation.JsonIgnore; - import javax.annotation.Nullable; - import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index c858936..befba1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -29,6 +29,9 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; @@ -36,9 +39,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index ead2569..8a03f3c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -33,6 +33,7 @@ import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 6828979..b023363 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -27,12 +27,12 @@ import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/ca41af8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java index 690d321..8913e74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection;