[2/3] incubator-beam git commit: Move InMemoryTimerInternals to runners-core

2016-12-20 Thread kenn
Move InMemoryTimerInternals to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205

Branch: refs/heads/master
Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb
Parents: 22e25a4
Author: Kenneth Knowles 
Authored: Thu Dec 15 20:45:56 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:21:52 2016 -0800

--
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 -
 .../runners/core/InMemoryTimerInternals.java| 273 ++
 .../core/InMemoryTimerInternalsTest.java| 155 +++
 .../beam/runners/core/ReduceFnTester.java   |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 ---
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 ---
 .../util/state/InMemoryTimerInternalsTest.java  | 153 ---
 10 files changed, 443 insertions(+), 471 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 9189191..efcd771 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
new file mode 100644
index 000..5fcd088
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.joda.time.Instant;
+
+/** {@link TimerInternals} with all watermarks and processing clock simulated 
in-memory. */
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue watermarkTimers 

[2/3] incubator-beam git commit: Move InMemoryTimerInternals to runners-core

2016-12-16 Thread kenn
Move InMemoryTimerInternals to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec0bf7b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec0bf7b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec0bf7b4

Branch: refs/heads/master
Commit: ec0bf7b4023ff75f4ec6723d2e77ed507eb57c51
Parents: 5587e1c
Author: Kenneth Knowles 
Authored: Thu Dec 15 20:45:56 2016 -0800
Committer: Kenneth Knowles 
Committed: Thu Dec 15 22:42:24 2016 -0800

--
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 -
 .../runners/core/InMemoryTimerInternals.java| 276 +++
 .../core/InMemoryTimerInternalsTest.java| 155 +++
 .../beam/runners/core/ReduceFnTester.java   |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 ---
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 --
 .../util/state/InMemoryTimerInternalsTest.java  | 153 --
 10 files changed, 446 insertions(+), 471 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec0bf7b4/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 9189191..efcd771 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec0bf7b4/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
new file mode 100644
index 000..b22fcb3
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.joda.time.Instant;
+
+/**
+ * Simulates the firing of timers and progression of input and output 
watermarks for a single
+ * computation and key in a Windmill-like streaming environment.
+ */
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set existingTimers = new HashSet<>();
+
+  /** Pending input