This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b8c8bf3 [BEAM-10120] Add dynamic timer support to portable Flink. new 73731ec Merge pull request #13783 from [BEAM-10120] Add dynamic timer support to portable Flink b8c8bf3 is described below commit b8c8bf3644b449fc009ebe0a2a07cb155bba0989 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Wed Jan 20 19:09:38 2021 -0800 [BEAM-10120] Add dynamic timer support to portable Flink. --- runners/flink/job-server/flink_job_server.gradle | 1 - .../translation/wrappers/streaming/DoFnOperator.java | 19 ++++++++++++++++--- .../streaming/ExecutableStageDoFnOperator.java | 7 +++---- .../streaming/ExecutableStageDoFnOperatorTest.java | 4 +++- .../fnexecution/control/TimerReceiverFactory.java | 1 + .../translation/PipelineTranslatorUtils.java | 4 ++-- .../runners/portability/flink_runner_test.py | 3 --- 7 files changed, 25 insertions(+), 14 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 130dd97..aa68411 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -163,7 +163,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' - excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 9c1e437..b463db26 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1412,6 +1412,10 @@ public class DoFnOperator<InputT, OutputT> return timer.getOutputTimestamp().isBefore(timer.getTimestamp()); } + private String constructTimerId(String timerFamilyId, String timerId) { + return timerFamilyId + "+" + timerId; + } + @Override public void setTimer( StateNamespace namespace, @@ -1437,7 +1441,10 @@ public class DoFnOperator<InputT, OutputT> timer.getTimerId(), timer.getTimestamp().getMillis(), timer.getOutputTimestamp().getMillis()); - String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace()); + String contextTimerId = + getContextTimerId( + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getNamespace()); @Nullable final TimerData oldTimer = pendingTimersById.get(contextTimerId); if (!timer.equals(oldTimer)) { // Only one timer can exist at a time for a given timer id and context. @@ -1500,7 +1507,10 @@ public class DoFnOperator<InputT, OutputT> */ void onFiredOrDeletedTimer(TimerData timer) { try { - pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace())); + pendingTimersById.remove( + getContextTimerId( + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getNamespace())); if (timer.getDomain() == TimeDomain.EVENT_TIME || StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) { if (timerUsesOutputTimestamp(timer)) { @@ -1532,7 +1542,10 @@ public class DoFnOperator<InputT, OutputT> @Override @Deprecated public void deleteTimer(TimerData timer) { - deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getDomain()); + deleteTimer( + timer.getNamespace(), + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getDomain()); } void deleteTimerInternal(TimerData timer) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index d479a3c..42949c0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -522,8 +522,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I try (Locker locker = Locker.locked(stateBackendLock)) { getKeyedStateBackend().setCurrentKey(encodedKey); if (timerElement.getClearBit()) { - timerInternals.deleteTimer( - timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain()); + timerInternals.deleteTimer(timerData); } else { timerInternals.setTimer(timerData); if (!timerData.getTimerId().equals(GC_TIMER_ID)) { @@ -973,7 +972,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I processElement(stateValue); } else { KV<String, String> transformAndTimerFamilyId = - TimerReceiverFactory.decodeTimerDataTimerId(timerId); + TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId); LOG.debug( "timer callback: {} {} {} {} {}", transformAndTimerFamilyId.getKey(), @@ -990,7 +989,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I Timer<?> timerValue = Timer.of( timerKey, - "", + timerId, Collections.singletonList(window), timestamp, outputTimestamp, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 336b414..49039a4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -510,8 +510,8 @@ public class ExecutableStageDoFnOperatorTest { timestamp, PaneInfo.NO_FIRING), TimerInternals.TimerData.of( - TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId), "", + TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId), StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow), timestamp, timestamp, @@ -799,6 +799,7 @@ public class ExecutableStageDoFnOperatorTest { // user timer that fires after the end of the window and after state cleanup TimerInternals.TimerData userTimer = TimerInternals.TimerData.of( + "", TimerReceiverFactory.encodeToTimerDataTimerId( timerInputKey.getKey(), timerInputKey.getValue()), stateNamespace, @@ -834,6 +835,7 @@ public class ExecutableStageDoFnOperatorTest { // Cleanup timer are rescheduled if a new timer is created during the bundle TimerInternals.TimerData userTimer2 = TimerInternals.TimerData.of( + "", TimerReceiverFactory.encodeToTimerDataTimerId( timerInputKey.getKey(), timerInputKey.getValue()), stateNamespace, diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java index b71cd3c..e46ef2a 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java @@ -89,6 +89,7 @@ public class TimerReceiverFactory { StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); TimerInternals.TimerData timerData = TimerInternals.TimerData.of( + timer.getDynamicTimerTag(), encodeToTimerDataTimerId(timerSpec.transformId(), timerSpec.timerId()), namespace, timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getFireTimestamp(), diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java index 0059cc7..61f21dd 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java @@ -151,13 +151,13 @@ public final class PipelineTranslatorUtils { Timer<?> timerValue = Timer.of( currentTimerKey, - "", + timer.getTimerId(), Collections.singletonList(window), timestamp, outputTimestamp, PaneInfo.NO_FIRING); KV<String, String> transformAndTimerId = - TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId()); + TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId()); FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId); Preconditions.checkNotNull( fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index a5f6ac3..b3c3d52 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -395,9 +395,6 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): def test_register_finalizations(self): raise unittest.SkipTest("BEAM-11021") - def test_pardo_dynamic_timer(self): - raise unittest.SkipTest("BEAM-10120") - # Inherits all other tests.