This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2e0b192efbae72fe5c0ef61abfc7b4d1b3bb75f5
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Jan 5 14:16:27 2018 +0100

    [BEAM-2140] Ignore event-time timers in SplittableDoFnOperator
    
    SplittableDoFnOperator is only used for executing ProcessFn, which does
    not use event-time timers. However, StatefulDoFnRunner does use
    event-time timers for state cleanup so this change makes sure that they
    don't end up being forwarded to the ProcessFn.
---
 .../runners/flink/translation/wrappers/streaming/DoFnOperator.java  | 1 -
 .../translation/wrappers/streaming/SplittableDoFnOperator.java      | 6 ++++++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 688a7cb..dd2f9c4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -38,7 +38,6 @@ import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 1a418a0..44be5f3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -142,6 +143,11 @@ public class SplittableDoFnOperator<
 
   @Override
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+    if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) {
+      // ignore this, it can only be a state cleanup timers from 
StatefulDoFnRunner and ProcessFn
+      // does its own state cleanup and should never set event-time timers.
+      return;
+    }
     doFnRunner.processElement(
         WindowedValue.valueInGlobalWindow(
             KeyedWorkItems.timersWorkItem(

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.

Reply via email to