Move TimerInternals to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b086d2fd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b086d2fd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b086d2fd Branch: refs/heads/master Commit: b086d2fdf1dba2b0e4a3362bd3ee1a92bf21c56a Parents: 92b33bc Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 21:17:18 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 09:26:56 2017 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 2 +- .../apex/translation/utils/NoOpStepContext.java | 2 +- .../beam/runners/core/BaseExecutionContext.java | 1 - .../beam/runners/core/ExecutionContext.java | 1 - .../GroupAlsoByWindowViaOutputBufferDoFn.java | 1 - .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 - .../runners/core/InMemoryTimerInternals.java | 1 - .../apache/beam/runners/core/KeyedWorkItem.java | 2 +- .../beam/runners/core/KeyedWorkItemCoder.java | 4 +- .../beam/runners/core/KeyedWorkItems.java | 2 +- .../core/LateDataDroppingDoFnRunner.java | 1 - .../beam/runners/core/PaneInfoTracker.java | 1 - .../runners/core/ReduceFnContextFactory.java | 3 +- .../beam/runners/core/ReduceFnRunner.java | 3 +- .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../beam/runners/core/SplittableParDo.java | 1 - .../beam/runners/core/TimerInternals.java | 288 +++++++++++++++++++ .../runners/core/TimerInternalsFactory.java | 1 - .../apache/beam/runners/core/WatermarkHold.java | 1 - .../beam/runners/core/WindowingInternals.java | 1 - .../core/GroupAlsoByWindowsProperties.java | 1 - .../core/InMemoryTimerInternalsTest.java | 2 +- .../runners/core/KeyedWorkItemCoderTest.java | 2 +- .../core/LateDataDroppingDoFnRunnerTest.java | 1 - .../core/PushbackSideInputDoFnRunnerTest.java | 2 +- .../beam/runners/core/ReduceFnTester.java | 3 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 3 +- .../beam/runners/core/SplittableParDoTest.java | 1 - .../beam/runners/core/TimerInternalsTest.java | 107 +++++++ .../triggers/TriggerStateMachineTester.java | 2 +- .../runners/direct/DirectExecutionContext.java | 2 +- .../beam/runners/direct/DirectRunner.java | 2 +- .../runners/direct/DirectTimerInternals.java | 2 +- ...ecycleManagerRemovingTransformEvaluator.java | 2 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 2 +- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 2 +- .../direct/StatefulParDoEvaluatorFactory.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 4 +- .../direct/DirectTimerInternalsTest.java | 2 +- ...leManagerRemovingTransformEvaluatorTest.java | 2 +- .../runners/direct/EvaluationContextTest.java | 2 +- .../runners/direct/WatermarkManagerTest.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../streaming/SingletonKeyedWorkItem.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../apache/beam/sdk/util/TimerInternals.java | 286 ------------------ .../sdk/transforms/join/UnionCoderTest.java | 7 - .../beam/sdk/util/TimerInternalsTest.java | 106 ------- .../beam/fn/harness/fake/FakeStepContext.java | 2 +- 57 files changed, 433 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 6322796..7891b34 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -47,6 +47,7 @@ import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -59,7 +60,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index f169ae6..feae46e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -20,9 +20,9 @@ package org.apache.beam.runners.apex.translation.utils; import java.io.IOException; import java.io.Serializable; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 7b674dc..eec913c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index f67aff4..f6bcc3d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 57981ad..4cde7da 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 9ccefda..49b010e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 3786b48..b4310ec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 2c3d78a..bbe1a24 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -29,7 +29,6 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.state.StateNamespace; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java index c75fc25..e825e43 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.core; -import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index dfd6a8d..7a144a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -26,14 +26,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java index 94c3bb6..5e379d8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java @@ -21,7 +21,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; import java.util.Collections; import java.util.Objects; -import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 9436ccf..4d41527 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index 69a4cfd..58e51aa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateAccessor; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index eae1a8b..6f8715e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -24,14 +24,13 @@ import com.google.common.collect.ImmutableMap; import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 96e76b7..50b1192 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks; import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory; import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; @@ -52,8 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 588e31d..9b1b852 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 9f80bca..6b2fbb2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 7368b2f..30ee7cb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java new file mode 100644 index 0000000..a50a622 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; +import com.google.common.collect.ComparisonChain; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.joda.time.Instant; + +/** + * Encapsulate interaction with time within the execution environment. + * + * <p>This class allows setting and deleting timers, and also retrieving an + * estimate of the current time. + */ +public interface TimerInternals { + + /** + * Sets a timer to be fired when the current time in the specified time domain reaches the + * target timestamp. + * + * <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer. + * + * <p>If a timer is set and then set again before it fires, later settings should clear the prior + * setting. + * + * <p>It is an error to set a timer for two different time domains. + */ + void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain); + + /** + * @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. + */ + @Deprecated + void setTimer(TimerData timerData); + + /** + * Deletes the given timer. + * + * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, but runners + * often manage timers for different time domains in very different ways, thus the + * {@link TimeDomain} is a required parameter. + */ + void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain); + + /** + * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. + */ + @Deprecated + void deleteTimer(StateNamespace namespace, String timerId); + + /** + * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. + */ + @Deprecated + void deleteTimer(TimerData timerKey); + + /** + * Returns the current timestamp in the {@link TimeDomain#PROCESSING_TIME} time domain. + */ + Instant currentProcessingTime(); + + /** + * Returns the current timestamp in the {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time + * domain or {@code null} if unknown. + */ + @Nullable + Instant currentSynchronizedProcessingTime(); + + /** + * Return the current, local input watermark timestamp for this computation + * in the {@link TimeDomain#EVENT_TIME} time domain. + * + * <p>This value: + * <ol> + * <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. + * <li>Is monotonically increasing. + * <li>May differ between workers due to network and other delays. + * <li>Will never be ahead of the global input watermark for this computation. But it + * may be arbitrarily behind the global input watermark. + * <li>Any element with a timestamp before the local input watermark can be considered + * 'locally late' and be subject to special processing or be dropped entirely. + * </ol> + * + * <p>Note that because the local input watermark can be behind the global input watermark, + * it is possible for an element to be considered locally on-time even though it is + * globally late. + */ + Instant currentInputWatermarkTime(); + + /** + * Return the current, local output watermark timestamp for this computation + * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown. + * + * <p>This value: + * <ol> + * <li>Is monotonically increasing. + * <li>Will never be ahead of {@link #currentInputWatermarkTime} as returned above. + * <li>May differ between workers due to network and other delays. + * <li>However will never be behind the global input watermark for any following computation. + * </ol> + * + * <p>In pictures: + * <pre>{@code + * | | | | | + * | | D | C | B | A + * | | | | | + * GIWM <= GOWM <= LOWM <= LIWM <= GIWM + * (next stage) + * -------------------------------------------------> event time + * }</pre> + * + * <p>where + * + * <ul> + * <li> LOWM = local output water mark. + * <li> GOWM = global output water mark. + * <li> GIWM = global input water mark. + * <li> LIWM = local input water mark. + * <li> A = A globally on-time element. + * <li> B = A globally late, but locally on-time element. + * <li> C = A locally late element which may still contribute to the timestamp of a pane. + * <li> D = A locally late element which cannot contribute to the timestamp of a pane. + * </ul> + * + * <p>Note that if a computation emits an element which is not before the current output watermark + * then that element will always appear locally on-time in all following computations. However, + * it is possible for an element emitted before the current output watermark to appear locally + * on-time in a following computation. Thus we must be careful to never assume locally late data + * viewed on the output of a computation remains locally late on the input of a following + * computation. + */ + @Nullable + Instant currentOutputWatermarkTime(); + + /** + * Data about a timer as represented within {@link TimerInternals}. + */ + @AutoValue + abstract class TimerData implements Comparable<TimerData> { + + public abstract String getTimerId(); + + public abstract StateNamespace getNamespace(); + + public abstract Instant getTimestamp(); + + public abstract TimeDomain getDomain(); + + /** + * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically + * generated. + */ + public static TimerData of( + String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) { + return new AutoValue_TimerInternals_TimerData(timerId, namespace, timestamp, domain); + } + + /** + * Construct a {@link TimerData} for the given parameters, where the timer ID is + * deterministically generated from the {@code timestamp} and {@code domain}. + */ + public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) { + String timerId = + new StringBuilder() + .append(domain.ordinal()) + .append(':') + .append(timestamp.getMillis()) + .toString(); + return of(timerId, namespace, timestamp, domain); + } + + /** + * {@inheritDoc}. + * + * <p>The ordering of {@link TimerData} that are not in the same namespace or domain is + * arbitrary. + */ + @Override + public int compareTo(TimerData that) { + if (this.equals(that)) { + return 0; + } + ComparisonChain chain = + ComparisonChain.start() + .compare(this.getTimestamp(), that.getTimestamp()) + .compare(this.getDomain(), that.getDomain()); + if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) { + // Obtaining the stringKey may be expensive; only do so if required + chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey()); + } + return chain.result(); + } + } + + /** + * A {@link Coder} for {@link TimerData}. + */ + class TimerDataCoder extends StandardCoder<TimerData> { + private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); + private static final InstantCoder INSTANT_CODER = InstantCoder.of(); + private final Coder<? extends BoundedWindow> windowCoder; + + public static TimerDataCoder of(Coder<? extends BoundedWindow> windowCoder) { + return new TimerDataCoder(windowCoder); + } + + @SuppressWarnings("unchecked") + @JsonCreator + public static TimerDataCoder of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, "Expecting 1 components, got %s", components.size()); + return of((Coder<? extends BoundedWindow>) components.get(0)); + } + + private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void encode(TimerData timer, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext); + STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext); + INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext); + STRING_CODER.encode(timer.getDomain().name(), outStream, context); + } + + @Override + public TimerData decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + String timerId = STRING_CODER.decode(inStream, nestedContext); + StateNamespace namespace = + StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder); + Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext); + TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context)); + return TimerData.of(timerId, namespace, timestamp, domain); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(windowCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic("window coder must be deterministic", windowCoder); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java index e129aed..79d71b7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.util.TimerInternals; /** * A factory for providing {@link TimerInternals} for a particular key. http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 5e5f44d..a7968db 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java index b8425b7..c033765 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java @@ -21,7 +21,6 @@ import java.util.Collection; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 8f5c1ef..423a674 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index e711285..a060d48 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaceForTest; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index 56a6f6b..3e8edbd 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.core; import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateNamespaces; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java index efe2044..57ef8f0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index 251c7c2..f3327c2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 4f4baac..61707ad 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; @@ -66,8 +67,6 @@ import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 8ae09cb..8deda5f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -36,8 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 427e2f4..4de07d1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java new file mode 100644 index 0000000..a0ef505 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link TimerInternals}. + */ +@RunWith(JUnit4.class) +public class TimerInternalsTest { + + @Test + public void testTimerDataCoder() throws Exception { + CoderProperties.coderDecodeEncodeEqual( + TimerDataCoder.of(GlobalWindow.Coder.INSTANCE), + TimerData.of( + "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME)); + + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + CoderProperties.coderDecodeEncodeEqual( + TimerDataCoder.of(windowCoder), + TimerData.of( + "another-id", + StateNamespaces.window( + windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), + new Instant(99), TimeDomain.PROCESSING_TIME)); + } + + @Test + public void testCoderIsSerializableWithWellKnownCoderType() { + CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE)); + } + + @Test + public void testCompareTo() { + Instant firstTimestamp = new Instant(100); + Instant secondTimestamp = new Instant(200); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), firstTimestamp); + IntervalWindow secondWindow = new IntervalWindow(firstTimestamp, secondTimestamp); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow); + StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow); + + TimerData firstEventTime = TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME); + TimerData secondEventTime = TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.EVENT_TIME); + TimerData thirdEventTime = TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.EVENT_TIME); + + TimerData firstProcTime = + TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.PROCESSING_TIME); + TimerData secondProcTime = + TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME); + TimerData thirdProcTime = + TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME); + + assertThat(firstEventTime, + comparesEqualTo(TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME))); + assertThat(firstEventTime, lessThan(secondEventTime)); + assertThat(secondEventTime, lessThan(thirdEventTime)); + assertThat(firstEventTime, lessThan(thirdEventTime)); + + assertThat(secondProcTime, + comparesEqualTo(TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME))); + assertThat(firstProcTime, lessThan(secondProcTime)); + assertThat(secondProcTime, lessThan(thirdProcTime)); + assertThat(firstProcTime, lessThan(thirdProcTime)); + + assertThat(firstEventTime, not(comparesEqualTo(firstProcTime))); + assertThat(firstProcTime, + not(comparesEqualTo(TimerData.of(firstWindowNs, + firstTimestamp, + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index c00cc48..5148ae6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -38,12 +38,12 @@ import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; import org.apache.beam.runners.core.TestInMemoryStateInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 8250cf1..1108f0d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.BaseExecutionContext; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.TimerInternals; /** * Execution Context for the {@link DirectRunner}. http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 5793b00..40ef60e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; @@ -53,7 +54,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 80e0721..233c3b8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -18,11 +18,11 @@ package org.apache.beam.runners.direct; import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java index 226e499..9bcd569 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.direct; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 0e89a67..69752fa 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 20d562f..935104a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -44,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index b97729a..ecf4ecd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 48f0f8d..328d139 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -26,13 +26,13 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 7ed4690..85a8d92 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -26,13 +26,13 @@ import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElement import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index cce97a6..8dd1657 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateNamespace; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index ae15285..a653858 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -50,14 +50,14 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index 51cfeed..4d48489 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -22,12 +22,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.state.StateNamespaces; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index a9d51e8..26b4ef1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -27,11 +27,11 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateNamespaces; import org.hamcrest.Matchers; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index ad6e32d..6360f34 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index abc8a28..ee1e5bf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -56,7 +57,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index cedad38..a656d4a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; @@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index baf3841..9fd83c6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -36,6 +36,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.coders.Coder; @@ -51,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index b53658e..b85efef 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import java.util.Collections; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 04b5f47..aa3429e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -44,18 +44,18 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 6d7529b..7452a11 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.aggregators.NamedAggregators; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 60c9d4d..486bc16 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -25,11 +25,11 @@ import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag;