[2/3] incubator-beam git commit: Move InMemoryTimerInternals to runners-core
Move InMemoryTimerInternals to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205 Branch: refs/heads/master Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb Parents: 22e25a4 Author: Kenneth KnowlesAuthored: Thu Dec 15 20:45:56 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:21:52 2016 -0800 -- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 - .../runners/core/InMemoryTimerInternals.java| 273 ++ .../core/InMemoryTimerInternalsTest.java| 155 +++ .../beam/runners/core/ReduceFnTester.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 16 +- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 36 --- .../sdk/util/state/InMemoryTimerInternals.java | 275 --- .../util/state/InMemoryTimerInternalsTest.java | 153 --- 10 files changed, 443 insertions(+), 471 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 9189191..efcd771 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java new file mode 100644 index 000..5fcd088 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.joda.time.Instant; + +/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */ +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue watermarkTimers
[2/3] incubator-beam git commit: Move InMemoryTimerInternals to runners-core
Move InMemoryTimerInternals to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec0bf7b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec0bf7b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec0bf7b4 Branch: refs/heads/master Commit: ec0bf7b4023ff75f4ec6723d2e77ed507eb57c51 Parents: 5587e1c Author: Kenneth KnowlesAuthored: Thu Dec 15 20:45:56 2016 -0800 Committer: Kenneth Knowles Committed: Thu Dec 15 22:42:24 2016 -0800 -- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 - .../runners/core/InMemoryTimerInternals.java| 276 +++ .../core/InMemoryTimerInternalsTest.java| 155 +++ .../beam/runners/core/ReduceFnTester.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 16 +- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 36 --- .../sdk/util/state/InMemoryTimerInternals.java | 275 -- .../util/state/InMemoryTimerInternalsTest.java | 153 -- 10 files changed, 446 insertions(+), 471 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec0bf7b4/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 9189191..efcd771 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec0bf7b4/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java new file mode 100644 index 000..b22fcb3 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.joda.time.Instant; + +/** + * Simulates the firing of timers and progression of input and output watermarks for a single + * computation and key in a Windmill-like streaming environment. + */ +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input