This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c395c84 Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner c395c84 is described below commit c395c84943fbd4875d2021f0cbf744e982c9fe31 Author: reuvenlax <re...@google.com> AuthorDate: Fri May 15 23:39:53 2020 -0700 Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner --- .../operators/ApexTimerInternalsTest.java | 6 +- .../beam/runners/core/ReduceFnContextFactory.java | 4 +- .../core/SplittableParDoViaKeyedWorkItems.java | 3 +- .../apache/beam/runners/core/TimerInternals.java | 32 +---- .../runners/core/InMemoryTimerInternalsTest.java | 38 ++++-- .../beam/runners/core/KeyedWorkItemCoderTest.java | 6 +- .../apache/beam/runners/core/ReduceFnTester.java | 4 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 1 + .../SimplePushbackSideInputDoFnRunnerTest.java | 1 + .../beam/runners/core/TimerInternalsTest.java | 37 ++++-- .../core/triggers/TriggerStateMachineTester.java | 4 +- .../runners/direct/DirectTimerInternalsTest.java | 26 +++- ...cycleManagerRemovingTransformEvaluatorTest.java | 7 +- .../beam/runners/direct/EvaluationContextTest.java | 3 +- .../beam/runners/direct/WatermarkManagerTest.java | 148 ++++++++++++++++----- .../streaming/ExecutableStageDoFnOperatorTest.java | 2 + .../dataflow/worker/WindmillTimerInternals.java | 137 ++++++++++++++----- .../worker/StreamingGroupAlsoByWindowFnsTest.java | 1 + ...eamingKeyedWorkItemSideInputDoFnRunnerTest.java | 1 + .../worker/StreamingModeExecutionContextTest.java | 6 +- .../worker/StreamingSideInputFetcherTest.java | 1 + .../dataflow/worker/UserParDoFnFactoryTest.java | 2 + .../dataflow/worker/WindmillKeyedWorkItemTest.java | 3 +- .../worker/WindmillTimerInternalsTest.java | 48 ++++--- .../apache/beam/runners/samza/runtime/DoFnOp.java | 1 + .../beam/runners/samza/runtime/KeyedTimerData.java | 2 +- .../samza/runtime/SamzaTimerInternalsFactory.java | 13 +- .../runners/samza/runtime/KeyedTimerDataTest.java | 2 +- .../runtime/SamzaTimerInternalsFactoryTest.java | 25 ++-- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 38 ++++++ 31 files changed, 439 insertions(+), 167 deletions(-) diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java index 180d2fe..f4e8d10 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java @@ -128,7 +128,11 @@ public class ApexTimerInternalsTest { TimerDataCoderV2 timerDataCoder = TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE); TimerData timerData = TimerData.of( - "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME); + "arbitrary-id", + StateNamespaces.global(), + new Instant(0), + new Instant(0), + TimeDomain.EVENT_TIME); String key = "key"; ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder); timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index ac6185b..8bdcc87 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -132,12 +132,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 01612f0..3ef996f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -545,7 +545,8 @@ public class SplittableParDoViaKeyedWorkItems { holdState.add(futureOutputWatermark); // Set a timer to continue processing this element. timerInternals.setTimer( - TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME)); + TimerInternals.TimerData.of( + stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME)); } private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 5ea3fb7..413884d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -214,27 +214,6 @@ public interface TimerInternals { } /** - * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically - * generated. Construct a {@link TimerData} for the given parameters except for {@code - * outputTimestamp}. {@code outputTimestamp} is set to timer {@code timestamp}. - */ - public static TimerData of( - String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) { - return new AutoValue_TimerInternals_TimerData( - timerId, "", namespace, timestamp, timestamp, domain); - } - - public static TimerData of( - String timerId, - String timerFamilyId, - StateNamespace namespace, - Instant timestamp, - TimeDomain domain) { - return new AutoValue_TimerInternals_TimerData( - timerId, timerFamilyId, namespace, timestamp, timestamp, domain); - } - - /** * Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is * deterministically generated from the {@code timestamp} and {@code domain}. */ @@ -250,15 +229,6 @@ public interface TimerInternals { } /** - * Construct a {@link TimerData} for the given parameters, where the timer ID is - * deterministically generated from the {@code timestamp} and {@code domain}. Also, output - * timestamp is set to the timer timestamp by default. - */ - public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) { - return of(namespace, timestamp, timestamp, domain); - } - - /** * {@inheritDoc}. * * <p>Used for sorting {@link TimerData} by timestamp. Furthermore, we compare timers by all the @@ -364,7 +334,7 @@ public interface TimerInternals { StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - return TimerData.of(timerId, namespace, timestamp, domain); + return TimerData.of(timerId, namespace, timestamp, timestamp, domain); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 3ab7932..bc997c3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -39,8 +39,10 @@ public class InMemoryTimerInternalsTest { @Test public void testFiringEventTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME); + TimerData eventTimer1 = + TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData eventTimer2 = + TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME); underTest.setTimer(eventTimer1); underTest.setTimer(eventTimer2); @@ -79,7 +81,7 @@ public class InMemoryTimerInternalsTest { underTest.advanceInputWatermark(laterTimestamp.plus(1L)); assertThat( underTest.removeNextEventTimer(), - equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, TimeDomain.EVENT_TIME))); + equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, laterTimestamp, TimeDomain.EVENT_TIME))); } @Test @@ -107,8 +109,10 @@ public class InMemoryTimerInternalsTest { @Test public void testFiringProcessingTimeTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData processingTime1 = + TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData processingTime2 = + TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME); underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); @@ -136,14 +140,20 @@ public class InMemoryTimerInternalsTest { @Test public void testTimerOrdering() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData eventTime1 = + TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime1 = + TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); TimerData synchronizedProcessingTime1 = - TimerData.of(NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimerData eventTime2 = + TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME); + TimerData processingTime2 = + TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME); TimerData synchronizedProcessingTime2 = - TimerData.of(NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimerData.of( + NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); underTest.setTimer(processingTime1); underTest.setTimer(eventTime1); @@ -176,8 +186,10 @@ public class InMemoryTimerInternalsTest { @Test public void testDeduplicate() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData eventTime = + TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime = + TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); underTest.setTimer(eventTime); underTest.setTimer(eventTime); underTest.setTimer(processingTime); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index 1aebb03..5de5749 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -43,7 +43,11 @@ public class KeyedWorkItemCoderTest { public void testEncodeDecodeEqual() throws Exception { Iterable<TimerData> timers = ImmutableList.of( - TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME)); + TimerData.of( + StateNamespaces.global(), + new Instant(500L), + new Instant(500L), + TimeDomain.EVENT_TIME)); Iterable<WindowedValue<Integer>> elements = ImmutableList.of( WindowedValue.valueInGlobalWindow(1), diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index f316d07..0e2e9a3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -570,7 +570,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); ArrayList<TimerData> timers = new ArrayList<>(1); timers.add( - TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); + TimerData.of( + StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain)); runner.onTimers(timers); runner.persist(); } @@ -583,6 +584,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { TimerData.of( StateNamespaces.window(windowFn.windowCoder(), window), timer.getTimestamp(), + timer.getTimestamp(), timer.getValue())); } runner.onTimers(timerData); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index a1c3d72..9daf372 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -441,6 +441,7 @@ public class SimpleDoFnRunnerTest { TimerData.of( DoFnWithTimers.TIMER_ID, StateNamespaces.window(windowCoder, (W) context.window()), + context.fireTimestamp(), context.timestamp(), context.timeDomain())); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index f9d4296..a0a6102 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -318,6 +318,7 @@ public class SimplePushbackSideInputDoFnRunnerTest { timerId, StateNamespaces.window(IntervalWindow.getCoder(), window), timestamp, + timestamp, TimeDomain.EVENT_TIME))); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index ab2978f..c6a36bd 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -42,7 +42,11 @@ public class TimerInternalsTest { CoderProperties.coderDecodeEncodeEqual( TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE), TimerData.of( - "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME)); + "arbitrary-id", + StateNamespaces.global(), + new Instant(0), + new Instant(0), + TimeDomain.EVENT_TIME)); Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); CoderProperties.coderDecodeEncodeEqual( @@ -52,6 +56,7 @@ public class TimerInternalsTest { StateNamespaces.window( windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), + new Instant(99), TimeDomain.PROCESSING_TIME)); } @@ -64,10 +69,12 @@ public class TimerInternalsTest { public void testCompareEqual() { Instant timestamp = new Instant(100); StateNamespace namespace = StateNamespaces.global(); - TimerData timer = TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME); + TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); assertThat( - timer, comparesEqualTo(TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME))); + timer, + comparesEqualTo( + TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME))); } @Test @@ -76,8 +83,10 @@ public class TimerInternalsTest { Instant secondTimestamp = new Instant(200); StateNamespace namespace = StateNamespaces.global(); - TimerData firstTimer = TimerData.of(namespace, firstTimestamp, TimeDomain.EVENT_TIME); - TimerData secondTimer = TimerData.of(namespace, secondTimestamp, TimeDomain.EVENT_TIME); + TimerData firstTimer = + TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME); + TimerData secondTimer = + TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME); assertThat(firstTimer, lessThan(secondTimer)); } @@ -87,10 +96,10 @@ public class TimerInternalsTest { Instant timestamp = new Instant(100); StateNamespace namespace = StateNamespaces.global(); - TimerData eventTimer = TimerData.of(namespace, timestamp, TimeDomain.EVENT_TIME); - TimerData procTimer = TimerData.of(namespace, timestamp, TimeDomain.PROCESSING_TIME); + TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME); TimerData synchronizedProcTimer = - TimerData.of(namespace, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); assertThat(eventTimer, lessThan(procTimer)); assertThat(eventTimer, lessThan(synchronizedProcTimer)); @@ -107,8 +116,10 @@ public class TimerInternalsTest { StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow); StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow); - TimerData secondEventTime = TimerData.of(firstWindowNs, timestamp, TimeDomain.EVENT_TIME); - TimerData thirdEventTime = TimerData.of(secondWindowNs, timestamp, TimeDomain.EVENT_TIME); + TimerData secondEventTime = + TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData thirdEventTime = + TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME); assertThat(secondEventTime, lessThan(thirdEventTime)); } @@ -118,8 +129,10 @@ public class TimerInternalsTest { Instant timestamp = new Instant(100); StateNamespace namespace = StateNamespaces.global(); - TimerData id0Timer = TimerData.of("id0", namespace, timestamp, TimeDomain.EVENT_TIME); - TimerData id1Timer = TimerData.of("id1", namespace, timestamp, TimeDomain.EVENT_TIME); + TimerData id0Timer = + TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData id1Timer = + TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); assertThat(id0Timer, lessThan(id1Timer)); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 21f2d13..bd59f87 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -413,12 +413,12 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); } @Override diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index dfec1f1..5bd0d39 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -62,13 +62,22 @@ public class DirectTimerInternalsTest { @Test public void setTimerAddsToBuilder() { TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(20145L), + new Instant(20145L), + TimeDomain.EVENT_TIME); TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(125555555L), + new Instant(125555555L), + TimeDomain.PROCESSING_TIME); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), + new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); internals.setTimer(eventTimer); internals.setTimer(processingTimer); @@ -82,13 +91,22 @@ public class DirectTimerInternalsTest { @Test public void deleteTimerDeletesOnBuilder() { TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(20145L), + new Instant(20145L), + TimeDomain.EVENT_TIME); TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(125555555L), + new Instant(125555555L), + TimeDomain.PROCESSING_TIME); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), + new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); internals.deleteTimer(eventTimer); internals.deleteTimer(processingTimer); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 3f10123..3e7871b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -104,7 +104,12 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { try { evaluator.onTimer( - TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME), + TimerData.of( + "foo", + StateNamespaces.global(), + new Instant(0), + new Instant(0), + TimeDomain.EVENT_TIME), "", GlobalWindow.INSTANCE); } catch (Exception e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index b2262af..055b48e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -316,7 +316,8 @@ public class EvaluationContextTest { StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of()); TimerData toFire = - TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME); TransformResult<?> timerResult = StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index eef43c7..6515e22 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -990,9 +990,17 @@ public class WatermarkManagerTest implements Serializable { StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of()); CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8); TimerData pastTimer = - TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(250L), + new Instant(250L), + TimeDomain.PROCESSING_TIME); TimerData futureTimer = - TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(4096L), + new Instant(4096L), + TimeDomain.PROCESSING_TIME); TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks( createdBundle, @@ -1125,7 +1133,8 @@ public class WatermarkManagerTest implements Serializable { CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); TimerData upstreamProcessingTimer = - TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME); manager.updateWatermarks( created, TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) @@ -1214,11 +1223,20 @@ public class WatermarkManagerTest implements Serializable { manager.refreshAll(); TimerData earliestTimer = - TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME); TimerData middleTimer = - TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(5000L), + new Instant(5000L), + TimeDomain.EVENT_TIME); TimerData lastTimer = - TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(10000L), + new Instant(10000L), + TimeDomain.EVENT_TIME); StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1290,11 +1308,23 @@ public class WatermarkManagerTest implements Serializable { new Instant(1500L)); TimerData earliestTimer = - TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(999L), + new Instant(999L), + TimeDomain.PROCESSING_TIME); TimerData middleTimer = - TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(5000L), + new Instant(5000L), + TimeDomain.PROCESSING_TIME); TimerData lastTimer = - TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(10000L), + new Instant(10000L), + TimeDomain.PROCESSING_TIME); StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1367,13 +1397,22 @@ public class WatermarkManagerTest implements Serializable { TimerData earliestTimer = TimerData.of( - StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + StateNamespaces.global(), + new Instant(999L), + new Instant(999L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); TimerData middleTimer = TimerData.of( - StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + StateNamespaces.global(), + new Instant(5000L), + new Instant(5000L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); TimerData lastTimer = TimerData.of( - StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + StateNamespaces.global(), + new Instant(10000L), + new Instant(10000L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1436,11 +1475,19 @@ public class WatermarkManagerTest implements Serializable { TimerData initialTimer = TimerData.of( - timerId, StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME); + timerId, + StateNamespaces.global(), + new Instant(5000L), + new Instant(5000L), + TimeDomain.PROCESSING_TIME); TimerData overridingTimer = TimerData.of( - timerId, StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME); + timerId, + StateNamespaces.global(), + new Instant(10000L), + new Instant(10000L), + TimeDomain.PROCESSING_TIME); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1483,9 +1530,19 @@ public class WatermarkManagerTest implements Serializable { StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of()); TimerData initialTimer = - TimerData.of(timerId, StateNamespaces.global(), new Instant(1000L), TimeDomain.EVENT_TIME); + TimerData.of( + timerId, + StateNamespaces.global(), + new Instant(1000L), + new Instant(1000L), + TimeDomain.EVENT_TIME); TimerData overridingTimer = - TimerData.of(timerId, StateNamespaces.global(), new Instant(2000L), TimeDomain.EVENT_TIME); + TimerData.of( + timerId, + StateNamespaces.global(), + new Instant(2000L), + new Instant(2000L), + TimeDomain.EVENT_TIME); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1543,9 +1600,19 @@ public class WatermarkManagerTest implements Serializable { // Apply a timer update StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of()); TimerData timer1 = - TimerData.of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME); + TimerData.of( + "a", + StateNamespaces.global(), + new Instant(100), + new Instant(100), + TimeDomain.EVENT_TIME); TimerData timer2 = - TimerData.of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME); + TimerData.of( + "a", + StateNamespaces.global(), + new Instant(200), + new Instant(200), + TimeDomain.EVENT_TIME); underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build()); // Only the last timer update should be observable @@ -1575,14 +1642,27 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderBuildAddsAllAddedTimers() { - TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME); + TimerData set = + TimerData.of( + StateNamespaces.global(), new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME); TimerData deleted = - TimerData.of(StateNamespaces.global(), new Instant(24L), TimeDomain.PROCESSING_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(24L), + new Instant(24L), + TimeDomain.PROCESSING_TIME); TimerData completedOne = TimerData.of( - StateNamespaces.global(), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + StateNamespaces.global(), + new Instant(1024L), + new Instant(1024L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); TimerData completedTwo = - TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + new Instant(2048L), + new Instant(2048L), + TimeDomain.EVENT_TIME); TimerUpdate update = TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())) @@ -1599,7 +1679,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithSetAtEndOfTime() { Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE; - TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME); + TimerData tooFar = + TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1610,7 +1691,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithSetPastEndOfTime() { Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2)); - TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME); + TimerData tooFar = + TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1621,7 +1703,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build(); @@ -1632,7 +1715,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build(); @@ -1643,7 +1727,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.build(); builder.setTimer(timer); @@ -1655,7 +1740,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.build(); builder.deletedTimer(timer); @@ -1667,7 +1753,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.build(); builder.withCompletedTimers(ImmutableList.of(timer)); @@ -1679,7 +1766,8 @@ public class WatermarkManagerTest implements Serializable { @Test public void timerUpdateWithCompletedTimersNotAddedToExisting() { TimerUpdateBuilder builder = TimerUpdate.builder(null); - TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + Instant now = Instant.now(); + TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); TimerUpdate built = builder.build(); assertThat(built.getCompletedTimers(), emptyIterable()); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 932fac8..efea477 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -759,6 +759,7 @@ public class ExecutableStageDoFnOperatorTest { timerInputKey.getKey(), timerInputKey.getValue()), stateNamespace, window.maxTimestamp(), + window.maxTimestamp(), TimeDomain.EVENT_TIME); timerInternals.setTimer(userTimer); @@ -793,6 +794,7 @@ public class ExecutableStageDoFnOperatorTest { timerInputKey.getKey(), timerInputKey.getValue()), stateNamespace, window.maxTimestamp(), + window.maxTimestamp(), TimeDomain.EVENT_TIME); operator.setTimer( Timer.of( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 5d92d2a..c8416c4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -29,6 +31,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExposedByteArrayInputStream; +import org.apache.beam.sdk.util.ExposedByteArrayOutputStream; +import org.apache.beam.sdk.util.VarInt; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable; @@ -278,27 +283,77 @@ class WindmillTimerInternals implements TimerInternals { // - the GlobalWindow is currently encoded in zero bytes, so it becomes "//" // - the Global StateNamespace is different, and becomes "/" // - the id is totally arbitrary; currently unescaped though that could change - String tag = timer.getTag().toStringUtf8(); + + ByteString tag = timer.getTag(); checkArgument( - timer.getTag().startsWith(prefix.byteString()), + tag.startsWith(prefix.byteString()), "Expected timer tag %s to start with prefix %s", tag, prefix.byteString()); + + Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); + + // Parse the namespace. int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash - int namespaceEnd = tag.indexOf('+', namespaceStart); // keep the end slash, drop the + - String namespaceString = tag.substring(namespaceStart, namespaceEnd); - String timerIdPlusTimerFamilyId = tag.substring(namespaceEnd + 1); // timerId+timerFamilyId - int timerIdEnd = timerIdPlusTimerFamilyId.indexOf('+'); // end of timerId - // if no '+' found then timerFamilyId is empty string else they have a '+' separator - String familyId = timerIdEnd == -1 ? "" : timerIdPlusTimerFamilyId.substring(timerIdEnd + 1); - String id = - timerIdEnd == -1 - ? timerIdPlusTimerFamilyId - : timerIdPlusTimerFamilyId.substring(0, timerIdEnd); + int namespaceEnd = namespaceStart; + while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { + namespaceEnd++; + } + String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8(); + + // Parse the timer id. + int timerIdStart = namespaceEnd + 1; + int timerIdEnd = timerIdStart; + while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') { + timerIdEnd++; + } + String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8(); + + // Parse the timer family. + int timerFamilyStart = timerIdEnd + 1; + int timerFamilyEnd = timerFamilyStart; + while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') { + timerFamilyEnd++; + } + // For backwards compatibility, handle the case were the timer family isn't present. + String timerFamily = + (timerFamilyStart < tag.size()) + ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8() + : ""; + + // Parse the output timestamp. + int outputTimestampStart = timerFamilyEnd + 1; + int outputTimestampEnd = outputTimestampStart; + while (outputTimestampEnd < tag.size() && tag.byteAt(outputTimestampEnd) != '+') { + outputTimestampEnd++; + } + + // For backwards compatibility, handle the case were the output timestamp isn't present. + Instant outputTimestamp = timestamp; + if ((outputTimestampStart < tag.size())) { + try { + outputTimestamp = + new Instant( + VarInt.decodeLong( + tag.substring(outputTimestampStart, outputTimestampEnd).newInput())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); - Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); + return TimerData.of( + timerId, + timerFamily, + namespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType())); + } - return TimerData.of(id, familyId, namespace, timestamp, timerTypeToTimeDomain(timer.getType())); + private static boolean useNewTimerTagEncoding(TimerData timerData) { + return !timerData.getTimerFamilyId().isEmpty() + || !timerData.getOutputTimestamp().equals(timerData.getTimestamp()); } /** @@ -309,27 +364,41 @@ class WindmillTimerInternals implements TimerInternals { */ public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; - // Timers without timerFamily would have timerFamily would be an empty string - if ("".equals(timerData.getTimerFamilyId())) { - tagString = - new StringBuilder() - .append(prefix.byteString().toStringUtf8()) // this never ends with a slash - .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash - .append('+') - .append(timerData.getTimerId()) // this is arbitrary; currently unescaped - .toString(); - } else { - tagString = - new StringBuilder() - .append(prefix.byteString().toStringUtf8()) // this never ends with a slash - .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash - .append('+') - .append(timerData.getTimerId()) // this is arbitrary; currently unescaped - .append('+') - .append(timerData.getTimerFamilyId()) - .toString(); + ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream(); + try { + if (useNewTimerTagEncoding(timerData)) { + tagString = + new StringBuilder() + .append(prefix.byteString().toStringUtf8()) // this never ends with a slash + .append( + timerData.getNamespace().stringKey()) // this must begin and end with a slash + .append('+') + .append(timerData.getTimerId()) // this is arbitrary; currently unescaped + .append('+') + .append(timerData.getTimerFamilyId()) + .toString(); + out.write(tagString.getBytes(StandardCharsets.UTF_8)); + // Only encode the extra 9 bytes if the output timestamp is different than the timestamp; + if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) { + out.write('+'); + VarInt.encode(timerData.getOutputTimestamp().getMillis(), out); + } + } else { + // Timers without timerFamily would have timerFamily would be an empty string + tagString = + new StringBuilder() + .append(prefix.byteString().toStringUtf8()) // this never ends with a slash + .append( + timerData.getNamespace().stringKey()) // this must begin and end with a slash + .append('+') + .append(timerData.getTimerId()) // this is arbitrary; currently unescaped + .toString(); + out.write(tagString.getBytes(StandardCharsets.UTF_8)); + } + return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray())); + } catch (IOException e) { + throw new RuntimeException(e); } - return ByteString.copyFromUtf8(tagString); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index bf9e875..0d5a883 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -149,6 +149,7 @@ public class StreamingGroupAlsoByWindowFnsTest { TimerData.of( namespace, timestamp, + timestamp, type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME))) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index 790a7d5..5d6f1e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -164,6 +164,7 @@ public class StreamingKeyedWorkItemSideInputDoFnRunnerTest { return TimerData.of( StateNamespaces.window(IntervalWindow.getCoder(), window), timestamp, + timestamp, type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index ff957c1..d5071d7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -143,7 +143,11 @@ public class StreamingModeExecutionContextTest { TimerInternals timerInternals = stepContext.timerInternals(); timerInternals.setTimer( - TimerData.of(new StateNamespaceForTest("key"), new Instant(5000), TimeDomain.EVENT_TIME)); + TimerData.of( + new StateNamespaceForTest("key"), + new Instant(5000), + new Instant(5000), + TimeDomain.EVENT_TIME)); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java index c1a9945..c9a4227 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java @@ -203,6 +203,7 @@ public class StreamingSideInputFetcherTest { return TimerData.of( StateNamespaces.window(IntervalWindow.getCoder(), createWindow(timestamp)), new Instant(timestamp), + new Instant(timestamp), TimeDomain.EVENT_TIME); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index 5e688f3..45889df 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -426,6 +426,7 @@ public class UserParDoFnFactoryTest { SimpleParDoFn.CLEANUP_TIMER_ID, firstWindowNamespace, firstWindow.maxTimestamp().plus(1L), + firstWindow.maxTimestamp().plus(1L), TimeDomain.EVENT_TIME)) .thenReturn(null); @@ -441,6 +442,7 @@ public class UserParDoFnFactoryTest { SimpleParDoFn.CLEANUP_TIMER_ID, secondWindowNamespace, secondWindow.maxTimestamp().plus(1L), + secondWindow.maxTimestamp().plus(1L), TimeDomain.EVENT_TIME)) .thenReturn(null); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index de62567..40d4c74 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -166,6 +166,7 @@ public class WindmillKeyedWorkItemTest { TimerData.of( ns, new Instant(timestamp), + new Instant(timestamp), WindmillTimerInternals.timerTypeToTimeDomain(type)))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) @@ -174,7 +175,7 @@ public class WindmillKeyedWorkItemTest { } private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) { - return TimerData.of(ns, new Instant(timestamp), domain); + return TimerData.of(ns, new Instant(timestamp), new Instant(timestamp), domain); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java index d0f0f8a..3c1f6bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java @@ -79,25 +79,43 @@ public class WindmillTimerInternalsTest { for (TimeDomain timeDomain : TimeDomain.values()) { for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) { for (Instant timestamp : TEST_TIMESTAMPS) { - TimerData anonymousTimerData = TimerData.of(namespace, timestamp, timeDomain); - - assertThat( - WindmillTimerInternals.windmillTimerToTimerData( - prefix, - WindmillTimerInternals.timerDataToWindmillTimer( - stateFamily, prefix, anonymousTimerData), - coder), - equalTo(anonymousTimerData)); - - for (String timerId : TEST_TIMER_IDS) { - TimerData timerData = TimerData.of(timerId, namespace, timestamp, timeDomain); + List<TimerData> anonymousTimers = + ImmutableList.of( + TimerData.of(namespace, timestamp, timestamp, timeDomain), + TimerData.of(namespace, timestamp, timestamp.minus(1), timeDomain)); + for (TimerData timer : anonymousTimers) { assertThat( WindmillTimerInternals.windmillTimerToTimerData( prefix, - WindmillTimerInternals.timerDataToWindmillTimer( - stateFamily, prefix, timerData), + WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer), coder), - equalTo(timerData)); + equalTo(timer)); + } + + for (String timerId : TEST_TIMER_IDS) { + List<TimerData> timers = + ImmutableList.of( + TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain), + TimerData.of( + timerId, "family", namespace, timestamp, timestamp, timeDomain), + TimerData.of(timerId, namespace, timestamp, timestamp.minus(1), timeDomain), + TimerData.of( + timerId, + "family", + namespace, + timestamp, + timestamp.minus(1), + timeDomain)); + + for (TimerData timer : timers) { + assertThat( + WindmillTimerInternals.windmillTimerToTimerData( + prefix, + WindmillTimerInternals.timerDataToWindmillTimer( + stateFamily, prefix, timer), + coder), + equalTo(timer)); + } } } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index b9d6216..4dd4441 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -277,6 +277,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> { bundleCheckTimerId, StateNamespaces.global(), nextBundleCheckTime, + nextBundleCheckTime, TimeDomain.PROCESSING_TIME); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java index a6214be..6a51967 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java @@ -160,7 +160,7 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> { final StateNamespace namespace = StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); final TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - final TimerData timer = TimerData.of(timerId, namespace, timestamp, domain); + final TimerData timer = TimerData.of(timerId, namespace, timestamp, timestamp, domain); byte[] keyBytes = null; K key = null; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java index 22c2ac9..e5ba2d3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java @@ -218,6 +218,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { timerData.getTimerId(), timerData.getNamespace(), new Instant(lastTimestamp), + new Instant(lastTimestamp), timerData.getDomain()); deleteTimer(lastTimerData, false); } @@ -244,12 +245,14 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { @Override public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - deleteTimer(TimerData.of(timerId, namespace, Instant.now(), timeDomain)); + Instant now = Instant.now(); + deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain)); } @Override public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) { - deleteTimer(TimerData.of(timerId, namespace, Instant.now(), TimeDomain.EVENT_TIME)); + Instant now = Instant.now(); + deleteTimer(TimerData.of(timerId, namespace, now, now, TimeDomain.EVENT_TIME)); } @Override @@ -455,7 +458,11 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { keyBytes, timerKey.key, TimerInternals.TimerData.of( - timerKey.timerId, timerKey.stateNamespace, new Instant(timestamp), domain)); + timerKey.timerId, + timerKey.stateNamespace, + new Instant(timestamp), + new Instant(timestamp), + domain)); } private TimerKey(K key, StateNamespace stateNamespace, String timerId) { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java index e521774..c16b4ce 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java @@ -36,7 +36,7 @@ public class KeyedTimerDataTest { public void testCoder() throws Exception { final TimerInternals.TimerData td = TimerInternals.TimerData.of( - "timer", StateNamespaces.global(), new Instant(), TimeDomain.EVENT_TIME); + "timer", StateNamespaces.global(), new Instant(), new Instant(), TimeDomain.EVENT_TIME); final String key = "timer-key"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index 0f536c3..d0d6126 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -144,11 +144,13 @@ public class SamzaTimerInternalsFactoryTest { final StateNamespace nameSpace = StateNamespaces.global(); final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = - TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = - TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer2); timerInternalsFactory.setInputWatermark(new Instant(5)); @@ -181,11 +183,13 @@ public class SamzaTimerInternalsFactoryTest { final StateNamespace nameSpace = StateNamespaces.global(); final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); final TimerInternals.TimerData timer1 = - TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = - TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer2); store.close(); @@ -226,12 +230,12 @@ public class SamzaTimerInternalsFactoryTest { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timer1", nameSpace, new Instant(10), TimeDomain.PROCESSING_TIME); + "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timer2", nameSpace, new Instant(100), TimeDomain.PROCESSING_TIME); + "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME); timerInternals.setTimer(timer2); assertEquals(2, timerRegistry.timers.size()); @@ -267,16 +271,19 @@ public class SamzaTimerInternalsFactoryTest { final StateNamespace nameSpace = StateNamespaces.global(); final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = - TimerInternals.TimerData.of("timerId", nameSpace, new Instant(10), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timerId", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer1); // this timer should override the first timer final TimerInternals.TimerData timer2 = - TimerInternals.TimerData.of("timerId", nameSpace, new Instant(100), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timerId", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = - TimerInternals.TimerData.of("timerId2", nameSpace, new Instant(200), TimeDomain.EVENT_TIME); + TimerInternals.TimerData.of( + "timerId2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME); timerInternals.setTimer(timer3); // this timer shouldn't override since it has a different id diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index c4df77b..c673cbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -270,10 +270,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD @Experimental(Kind.TIMERS) public abstract class OnTimerContext extends WindowedContext { - /** Returns the timestamp of the current timer. */ + /** Returns the output timestamp of the current timer. */ public abstract Instant timestamp(); - /** Returns the output timestamp of the current timer. */ + /** Returns the firing timestamp of the current timer. */ public abstract Instant fireTimestamp(); /** Returns the window in which the timer is firing. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index c71e963..bfae241 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3541,6 +3541,44 @@ public class ParDoTest implements Serializable { UsesTimersInParDo.class, DataflowPortabilityApiUnsupported.class }) + public void testOutputTimestampDefault() throws Exception { + final String timerId = "foo"; + DoFn<KV<String, Long>, Long> fn1 = + new DoFn<KV<String, Long>, Long>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) { + timer + .withOutputTimestamp(timestamp.plus(Duration.millis(5))) + .set(timestamp.plus(Duration.millis(10))); + } + + @OnTimer(timerId) + public void onTimer(@Timestamp Instant timestamp, OutputReceiver<Long> o) { + o.output(timestamp.getMillis()); + } + }; + + PCollection<Long> output = + pipeline + .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 1L), new Instant(3)))) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply("first", ParDo.of(fn1)); + + PAssert.that(output).containsInAnyOrder(new Instant(8).getMillis()); // result output + pipeline.run(); + } + + @Test + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + DataflowPortabilityApiUnsupported.class + }) public void testOutOfBoundsEventTimeTimerHold() throws Exception { final String timerId = "foo";