Move TimerInternals to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b086d2fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b086d2fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b086d2fd

Branch: refs/heads/master
Commit: b086d2fdf1dba2b0e4a3362bd3ee1a92bf21c56a
Parents: 92b33bc
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jan 26 21:17:18 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 09:26:56 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../apex/translation/utils/NoOpStepContext.java |   2 +-
 .../beam/runners/core/BaseExecutionContext.java |   1 -
 .../beam/runners/core/ExecutionContext.java     |   1 -
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 -
 .../runners/core/InMemoryTimerInternals.java    |   1 -
 .../apache/beam/runners/core/KeyedWorkItem.java |   2 +-
 .../beam/runners/core/KeyedWorkItemCoder.java   |   4 +-
 .../beam/runners/core/KeyedWorkItems.java       |   2 +-
 .../core/LateDataDroppingDoFnRunner.java        |   1 -
 .../beam/runners/core/PaneInfoTracker.java      |   1 -
 .../runners/core/ReduceFnContextFactory.java    |   3 +-
 .../beam/runners/core/ReduceFnRunner.java       |   3 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |   1 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   1 -
 .../beam/runners/core/SplittableParDo.java      |   1 -
 .../beam/runners/core/TimerInternals.java       | 288 +++++++++++++++++++
 .../runners/core/TimerInternalsFactory.java     |   1 -
 .../apache/beam/runners/core/WatermarkHold.java |   1 -
 .../beam/runners/core/WindowingInternals.java   |   1 -
 .../core/GroupAlsoByWindowsProperties.java      |   1 -
 .../core/InMemoryTimerInternalsTest.java        |   2 +-
 .../runners/core/KeyedWorkItemCoderTest.java    |   2 +-
 .../core/LateDataDroppingDoFnRunnerTest.java    |   1 -
 .../core/PushbackSideInputDoFnRunnerTest.java   |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |   3 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   3 +-
 .../beam/runners/core/SplittableParDoTest.java  |   1 -
 .../beam/runners/core/TimerInternalsTest.java   | 107 +++++++
 .../triggers/TriggerStateMachineTester.java     |   2 +-
 .../runners/direct/DirectExecutionContext.java  |   2 +-
 .../beam/runners/direct/DirectRunner.java       |   2 +-
 .../runners/direct/DirectTimerInternals.java    |   2 +-
 ...ecycleManagerRemovingTransformEvaluator.java |   2 +-
 .../beam/runners/direct/EvaluationContext.java  |   2 +-
 .../direct/ExecutorServiceParallelExecutor.java |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../beam/runners/direct/ParDoEvaluator.java     |   2 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   2 +-
 .../beam/runners/direct/WatermarkManager.java   |   4 +-
 .../direct/DirectTimerInternalsTest.java        |   2 +-
 ...leManagerRemovingTransformEvaluatorTest.java |   2 +-
 .../runners/direct/EvaluationContextTest.java   |   2 +-
 .../runners/direct/WatermarkManagerTest.java    |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../streaming/SingletonKeyedWorkItem.java       |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../apache/beam/sdk/util/TimerInternals.java    | 286 ------------------
 .../sdk/transforms/join/UnionCoderTest.java     |   7 -
 .../beam/sdk/util/TimerInternalsTest.java       | 106 -------
 .../beam/fn/harness/fake/FakeStepContext.java   |   2 +-
 57 files changed, 433 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 6322796..7891b34 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.WindowingInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -59,7 +60,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index f169ae6..feae46e 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.apex.translation.utils;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index 7b674dc..eec913c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index f67aff4..f6bcc3d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Collection;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 57981ad..4cde7da 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -23,7 +23,6 @@ import 
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 9ccefda..49b010e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 3786b48..b4310ec 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -27,11 +27,9 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 2c3d78a..bbe1a24 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -29,7 +29,6 @@ import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
index c75fc25..e825e43 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index dfd6a8d..7a144a6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -26,14 +26,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
index 94c3bb6..5e379d8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
@@ -21,7 +21,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
 import java.util.Collections;
 import java.util.Objects;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 9436ccf..4d41527 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 69a4cfd..58e51aa 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -24,7 +24,6 @@ import 
org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateAccessor;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index eae1a8b..6f8715e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -24,14 +24,13 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 96e76b7..50b1192 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
 import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
@@ -52,8 +53,6 @@ import 
org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 588e31d..9b1b852 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 9f80bca..6b2fbb2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 7368b2f..30ee7cb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
new file mode 100644
index 0000000..a50a622
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ComparisonChain;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+
+/**
+ * Encapsulate interaction with time within the execution environment.
+ *
+ * <p>This class allows setting and deleting timers, and also retrieving an
+ * estimate of the current time.
+ */
+public interface TimerInternals {
+
+  /**
+   * Sets a timer to be fired when the current time in the specified time 
domain reaches the
+   * target timestamp.
+   *
+   * <p>The combination of {@code namespace} and {@code timerId} uniquely 
identify a timer.
+   *
+   * <p>If a timer is set and then set again before it fires, later settings 
should clear the prior
+   * setting.
+   *
+   * <p>It is an error to set a timer for two different time domains.
+   */
+  void setTimer(StateNamespace namespace, String timerId, Instant target, 
TimeDomain timeDomain);
+
+  /**
+   * @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}.
+   */
+  @Deprecated
+  void setTimer(TimerData timerData);
+
+  /**
+   * Deletes the given timer.
+   *
+   * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, 
but runners
+   * often manage timers for different time domains in very different ways, 
thus the
+   * {@link TimeDomain} is a required parameter.
+   */
+  void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain);
+
+  /**
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
+   */
+  @Deprecated
+  void deleteTimer(StateNamespace namespace, String timerId);
+
+  /**
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
+   */
+  @Deprecated
+  void deleteTimer(TimerData timerKey);
+
+  /**
+   * Returns the current timestamp in the {@link TimeDomain#PROCESSING_TIME} 
time domain.
+   */
+  Instant currentProcessingTime();
+
+  /**
+   * Returns the current timestamp in the {@link 
TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time
+   * domain or {@code null} if unknown.
+   */
+  @Nullable
+  Instant currentSynchronizedProcessingTime();
+
+  /**
+   * Return the current, local input watermark timestamp for this computation
+   * in the {@link TimeDomain#EVENT_TIME} time domain.
+   *
+   * <p>This value:
+   * <ol>
+   * <li>Is never {@literal null}, but may be {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}.
+   * <li>Is monotonically increasing.
+   * <li>May differ between workers due to network and other delays.
+   * <li>Will never be ahead of the global input watermark for this 
computation. But it
+   * may be arbitrarily behind the global input watermark.
+   * <li>Any element with a timestamp before the local input watermark can be 
considered
+   * 'locally late' and be subject to special processing or be dropped 
entirely.
+   * </ol>
+   *
+   * <p>Note that because the local input watermark can be behind the global 
input watermark,
+   * it is possible for an element to be considered locally on-time even 
though it is
+   * globally late.
+   */
+  Instant currentInputWatermarkTime();
+
+  /**
+   * Return the current, local output watermark timestamp for this computation
+   * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if 
unknown.
+   *
+   * <p>This value:
+   * <ol>
+   * <li>Is monotonically increasing.
+   * <li>Will never be ahead of {@link #currentInputWatermarkTime} as returned 
above.
+   * <li>May differ between workers due to network and other delays.
+   * <li>However will never be behind the global input watermark for any 
following computation.
+   * </ol>
+   *
+   * <p>In pictures:
+   * <pre>{@code
+   *  |              |       |       |       |
+   *  |              |   D   |   C   |   B   |   A
+   *  |              |       |       |       |
+   * GIWM     <=    GOWM <= LOWM <= LIWM <= GIWM
+   * (next stage)
+   * -------------------------------------------------> event time
+   * }</pre>
+   *
+   * <p>where
+   *
+   * <ul>
+   * <li> LOWM = local output water mark.
+   * <li> GOWM = global output water mark.
+   * <li> GIWM = global input water mark.
+   * <li> LIWM = local input water mark.
+   * <li> A = A globally on-time element.
+   * <li> B = A globally late, but locally on-time element.
+   * <li> C = A locally late element which may still contribute to the 
timestamp of a pane.
+   * <li> D = A locally late element which cannot contribute to the timestamp 
of a pane.
+   * </ul>
+   *
+   * <p>Note that if a computation emits an element which is not before the 
current output watermark
+   * then that element will always appear locally on-time in all following 
computations. However,
+   * it is possible for an element emitted before the current output watermark 
to appear locally
+   * on-time in a following computation. Thus we must be careful to never 
assume locally late data
+   * viewed on the output of a computation remains locally late on the input 
of a following
+   * computation.
+   */
+  @Nullable
+  Instant currentOutputWatermarkTime();
+
+  /**
+   * Data about a timer as represented within {@link TimerInternals}.
+   */
+  @AutoValue
+  abstract class TimerData implements Comparable<TimerData> {
+
+    public abstract String getTimerId();
+
+    public abstract StateNamespace getNamespace();
+
+    public abstract Instant getTimestamp();
+
+    public abstract TimeDomain getDomain();
+
+    /**
+     * Construct a {@link TimerData} for the given parameters, where the timer 
ID is automatically
+     * generated.
+     */
+    public static TimerData of(
+        String timerId, StateNamespace namespace, Instant timestamp, 
TimeDomain domain) {
+      return new AutoValue_TimerInternals_TimerData(timerId, namespace, 
timestamp, domain);
+    }
+
+    /**
+     * Construct a {@link TimerData} for the given parameters, where the timer 
ID is
+     * deterministically generated from the {@code timestamp} and {@code 
domain}.
+     */
+    public static TimerData of(StateNamespace namespace, Instant timestamp, 
TimeDomain domain) {
+      String timerId =
+          new StringBuilder()
+              .append(domain.ordinal())
+              .append(':')
+              .append(timestamp.getMillis())
+              .toString();
+      return of(timerId, namespace, timestamp, domain);
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>The ordering of {@link TimerData} that are not in the same namespace 
or domain is
+     * arbitrary.
+     */
+    @Override
+    public int compareTo(TimerData that) {
+      if (this.equals(that)) {
+        return 0;
+      }
+      ComparisonChain chain =
+          ComparisonChain.start()
+              .compare(this.getTimestamp(), that.getTimestamp())
+              .compare(this.getDomain(), that.getDomain());
+      if (chain.result() == 0 && 
!this.getNamespace().equals(that.getNamespace())) {
+        // Obtaining the stringKey may be expensive; only do so if required
+        chain = chain.compare(getNamespace().stringKey(), 
that.getNamespace().stringKey());
+      }
+      return chain.result();
+    }
+  }
+
+  /**
+   * A {@link Coder} for {@link TimerData}.
+   */
+  class TimerDataCoder extends StandardCoder<TimerData> {
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+    private final Coder<? extends BoundedWindow> windowCoder;
+
+    public static TimerDataCoder of(Coder<? extends BoundedWindow> 
windowCoder) {
+      return new TimerDataCoder(windowCoder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @JsonCreator
+    public static TimerDataCoder of(
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+        List<Coder<?>> components) {
+      checkArgument(components.size() == 1, "Expecting 1 components, got %s", 
components.size());
+      return of((Coder<? extends BoundedWindow>) components.get(0));
+    }
+
+    private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void encode(TimerData timer, OutputStream outStream, Context 
context)
+        throws CoderException, IOException {
+      Context nestedContext = context.nested();
+      STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, 
nestedContext);
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getDomain().name(), outStream, context);
+    }
+
+    @Override
+    public TimerData decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      Context nestedContext = context.nested();
+      String timerId = STRING_CODER.decode(inStream, nestedContext);
+      StateNamespace namespace =
+          StateNamespaces.fromString(STRING_CODER.decode(inStream, 
nestedContext), windowCoder);
+      Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
+      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, 
context));
+      return TimerData.of(timerId, namespace, timestamp, domain);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.asList(windowCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic("window coder must be deterministic", windowCoder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
index e129aed..79d71b7 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.util.TimerInternals;
 
 /**
  * A factory for providing {@link TimerInternals} for a particular key.

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 5e5f44d..a7968db 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index b8425b7..c033765 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 8f5c1ef..423a674 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index e711285..a060d48 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaceForTest;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
index 56a6f6b..3e8edbd 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -18,12 +18,12 @@
 package org.apache.beam.runners.core;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index efe2044..57ef8f0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 251c7c2..f3327c2 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
@@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 4f4baac..61707ad 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
@@ -66,8 +67,6 @@ import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 8ae09cb..8deda5f 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -36,8 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 427e2f4..4de07d1 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
new file mode 100644
index 0000000..a0ef505
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.comparesEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link TimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class TimerInternalsTest {
+
+  @Test
+  public void testTimerDataCoder() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(
+        TimerDataCoder.of(GlobalWindow.Coder.INSTANCE),
+        TimerData.of(
+            "arbitrary-id", StateNamespaces.global(), new Instant(0), 
TimeDomain.EVENT_TIME));
+
+    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+    CoderProperties.coderDecodeEncodeEqual(
+        TimerDataCoder.of(windowCoder),
+        TimerData.of(
+            "another-id",
+            StateNamespaces.window(
+                windowCoder, new IntervalWindow(new Instant(0), new 
Instant(100))),
+            new Instant(99), TimeDomain.PROCESSING_TIME));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    
CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testCompareTo() {
+    Instant firstTimestamp = new Instant(100);
+    Instant secondTimestamp = new Instant(200);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), 
firstTimestamp);
+    IntervalWindow secondWindow =  new IntervalWindow(firstTimestamp, 
secondTimestamp);
+    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+    StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, 
firstWindow);
+    StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, 
secondWindow);
+
+    TimerData firstEventTime = TimerData.of(firstWindowNs, firstTimestamp, 
TimeDomain.EVENT_TIME);
+    TimerData secondEventTime = TimerData.of(firstWindowNs, secondTimestamp, 
TimeDomain.EVENT_TIME);
+    TimerData thirdEventTime = TimerData.of(secondWindowNs, secondTimestamp, 
TimeDomain.EVENT_TIME);
+
+    TimerData firstProcTime =
+        TimerData.of(firstWindowNs, firstTimestamp, 
TimeDomain.PROCESSING_TIME);
+    TimerData secondProcTime =
+        TimerData.of(firstWindowNs, secondTimestamp, 
TimeDomain.PROCESSING_TIME);
+    TimerData thirdProcTime =
+        TimerData.of(secondWindowNs, secondTimestamp, 
TimeDomain.PROCESSING_TIME);
+
+    assertThat(firstEventTime,
+        comparesEqualTo(TimerData.of(firstWindowNs, firstTimestamp, 
TimeDomain.EVENT_TIME)));
+    assertThat(firstEventTime, lessThan(secondEventTime));
+    assertThat(secondEventTime, lessThan(thirdEventTime));
+    assertThat(firstEventTime, lessThan(thirdEventTime));
+
+    assertThat(secondProcTime,
+        comparesEqualTo(TimerData.of(firstWindowNs, secondTimestamp, 
TimeDomain.PROCESSING_TIME)));
+    assertThat(firstProcTime, lessThan(secondProcTime));
+    assertThat(secondProcTime, lessThan(thirdProcTime));
+    assertThat(firstProcTime, lessThan(thirdProcTime));
+
+    assertThat(firstEventTime, not(comparesEqualTo(firstProcTime)));
+    assertThat(firstProcTime,
+        not(comparesEqualTo(TimerData.of(firstWindowNs,
+            firstTimestamp,
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index c00cc48..5148ae6 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -38,12 +38,12 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.MergingActiveWindowSet;
 import org.apache.beam.runners.core.NonMergingActiveWindowSet;
 import org.apache.beam.runners.core.TestInMemoryStateInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 8250cf1..1108f0d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.BaseExecutionContext;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.TimerInternals;
 
 /**
  * Execution Context for the {@link DirectRunner}.

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 5793b00..40ef60e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
@@ -53,7 +54,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 80e0721..233c3b8 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -18,11 +18,11 @@
 package org.apache.beam.runners.direct;
 
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 226e499..9bcd569 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 0e89a67..69752fa 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 20d562f..935104a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -44,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b97729a..ecf4ecd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 48f0f8d..328d139 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -26,13 +26,13 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 7ed4690..85a8d92 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -26,13 +26,13 @@ import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElement
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index cce97a6..8dd1657 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
@@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateNamespace;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index ae15285..a653858 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -50,14 +50,14 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
index 51cfeed..4d48489 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
@@ -22,12 +22,12 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index a9d51e8..26b4ef1 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -27,11 +27,11 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.hamcrest.Matchers;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index ad6e32d..6360f34 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.BagState;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index abc8a28..ee1e5bf 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -56,7 +57,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index cedad38..a656d4a 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.WindowingInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index baf3841..9fd83c6 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.TimerInternals;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -51,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
index b53658e..b85efef 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -19,7 +19,7 @@ package 
org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import java.util.Collections;
 import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 04b5f47..aa3429e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -44,18 +44,18 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 6d7529b..7452a11 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/b086d2fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 60c9d4d..486bc16 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -25,11 +25,11 @@ import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;

Reply via email to