Remove use of OldDoFn from some DirectRunner tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d086857 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d086857 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d086857 Branch: refs/heads/apex-runner Commit: 3d086857de87734b087076dad3eca92f625bb417 Parents: 4051357 Author: Kenneth Knowles <k...@google.com> Authored: Mon Oct 24 16:09:13 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Oct 25 13:12:17 2016 -0700 ---------------------------------------------------------------------- .../ConsumerTrackingPipelineVisitorTest.java | 32 +++---- .../beam/runners/direct/DirectRunnerTest.java | 40 +++++---- .../ImmutabilityCheckingBundleFactoryTest.java | 8 +- .../ImmutabilityEnforcementFactoryTest.java | 8 +- .../direct/KeyedPValueTrackingVisitorTest.java | 8 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 8 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 87 +++++++++--------- .../direct/ParDoSingleEvaluatorFactoryTest.java | 94 +++++++++----------- .../runners/direct/WatermarkManagerTest.java | 8 +- 9 files changed, 139 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java index 1c9b5a6..e8f2a7e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -60,9 +60,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply("listCreate", Create.of("foo", "bar")) .apply( ParDo.of( - new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -107,9 +107,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -138,9 +138,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -155,9 +155,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply(Create.of("1", "2", "3")) .apply( ParDo.of( - new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -180,9 +180,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 4027d25..34a5469 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -221,8 +220,8 @@ public class DirectRunnerTest implements Serializable { @Test public void transformDisplayDataExceptionShouldFail() { - OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() { - @Override + DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception {} @Override @@ -242,7 +241,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -251,8 +250,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + @ProcessElement + public void processElement(ProcessContext c) { List<Integer> outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -267,7 +267,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -276,8 +276,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + @ProcessElement + public void processElement(ProcessContext c) { List<Integer> outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -291,7 +292,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -300,8 +301,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn<Integer, byte[]>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn<Integer, byte[]>() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; c.output(outputArray); outputArray[0] = 0xa; @@ -316,7 +318,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the + * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -326,8 +328,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn<List<Integer>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { List<Integer> inputList = c.element(); inputList.set(0, 37); c.output(12); @@ -341,7 +344,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -350,8 +353,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new OldDoFn<byte[], Integer>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn<byte[], Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] inputArray = c.element(); inputArray[0] = 0xa; c.output(13); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index d445944..ea44125 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -179,9 +179,9 @@ public class ImmutabilityCheckingBundleFactoryTest { intermediate.commit(Instant.now()); } - private static class IdentityDoFn<T> extends OldDoFn<T, T> { - @Override - public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception { + private static class IdentityDoFn<T> extends DoFn<T, T> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 812d7d5..a7277fe 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; @@ -57,9 +57,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) .apply( ParDo.of( - new OldDoFn<byte[], byte[]>() { - @Override - public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c) + new DoFn<byte[], byte[]>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.element()[0] = 'b'; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index ee6b2b4..cf65936 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -31,9 +31,9 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -177,9 +177,9 @@ public class KeyedPValueTrackingVisitorTest { } } - private static class IdentityFn<K> extends OldDoFn<K, K> { - @Override - public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception { + private static class IdentityFn<K> extends DoFn<K, K> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 1a742f0..6d00aa1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -37,7 +37,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -168,7 +168,7 @@ public class ParDoEvaluatorTest { ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output)); } - private static class RecorderFn extends OldDoFn<Integer, Integer> { + private static class RecorderFn extends DoFn<Integer, Integer> { private Collection<Integer> processed; private final PCollectionView<Integer> view; @@ -177,8 +177,8 @@ public class ParDoEvaluatorTest { this.view = view; } - @Override - public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { processed.add(c.element()); c.output(c.element() + c.sideInput(view)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 8b0070b..cc83323 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -41,11 +41,16 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.WatermarkHoldState; @@ -81,8 +86,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.<String, Integer>of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -170,8 +175,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.<String, Integer>of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -258,20 +263,17 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(20202L + c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window( - GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn<String, KV<String, Integer>>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec<Object, BagState<String>> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { + bagState.add(c.element()); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); @@ -362,34 +364,25 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn<String, KV<String, Integer>>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index e562b28..d22643a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -32,22 +32,25 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -74,8 +77,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<Integer> collection = input.apply( ParDo.of( - new OldDoFn<String, Integer>() { - @Override + new DoFn<String, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); } @@ -128,8 +131,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<Integer> collection = input.apply( ParDo.of( - new OldDoFn<String, Integer>() { - @Override + new DoFn<String, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element().length()); } @@ -178,26 +181,22 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); final StateNamespace windowNs = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); ParDo.Bound<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(124443L - c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn<String, KV<String, Integer>>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec<Object, BagState<String>> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { + bagState.add(c.element()); } }); PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); @@ -237,9 +236,6 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); assertThat(result.getState(), not(nullValue())); assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(124438L))); - assertThat( result.getState().state(windowNs, bagTag).read(), containsInAnyOrder("foo", "bara", "bazam")); } @@ -255,6 +251,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + // TODO: this timer data is absolute, but the new API only support relative settings. + // It will require adjustments when @Ignore is removed final TimerData addedTimer = TimerData.of( StateNamespaces.window( @@ -276,34 +274,24 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { ParDo.Bound<String, KV<String, Integer>> pardo = ParDo.of( - new OldDoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn<String, KV<String, Integer>>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }); PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 042abab..1954005 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -101,9 +101,9 @@ public class WatermarkManagerTest implements Serializable { createdInts = p.apply("createdInts", Create.of(1, 2, 3)); filtered = createdInts.apply("filtered", Filter.greaterThan(1)); - filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() { - @Override - public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception { + filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element() * 2); } }));