This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2e0b192efbae72fe5c0ef61abfc7b4d1b3bb75f5 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> AuthorDate: Fri Jan 5 14:16:27 2018 +0100 [BEAM-2140] Ignore event-time timers in SplittableDoFnOperator SplittableDoFnOperator is only used for executing ProcessFn, which does not use event-time timers. However, StatefulDoFnRunner does use event-time timers for state cleanup so this change makes sure that they don't end up being forwarded to the ProcessFn. --- .../runners/flink/translation/wrappers/streaming/DoFnOperator.java | 1 - .../translation/wrappers/streaming/SplittableDoFnOperator.java | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 688a7cb..dd2f9c4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -38,7 +38,6 @@ import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 1a418a0..44be5f3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -142,6 +143,11 @@ public class SplittableDoFnOperator< @Override public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { + if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) { + // ignore this, it can only be a state cleanup timers from StatefulDoFnRunner and ProcessFn + // does its own state cleanup and should never set event-time timers. + return; + } doFnRunner.processElement( WindowedValue.valueInGlobalWindow( KeyedWorkItems.timersWorkItem( -- To stop receiving notification emails like this one, please contact aljos...@apache.org.