[ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167267
 ]

ASF GitHub Bot logged work on BEAM-4681:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/18 22:13
            Start Date: 18/Nov/18 22:13
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234463725
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##########
 @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler(
   public void mapPartition(
       Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
       throws Exception {
-    processElements(iterable, collector);
+
+    ReceiverFactory receiverFactory = new ReceiverFactory(collector, 
outputMap);
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+      processElements(iterable, bundle);
+    }
   }
 
-  /** For stateful processing via a GroupReduceFunction. */
+  /** For stateful and timer processing via a GroupReduceFunction. */
   @Override
   public void reduce(Iterable<WindowedValue<InputT>> iterable, 
Collector<RawUnionValue> collector)
       throws Exception {
-    bagUserStateHandlerFactory.resetForNewKey();
-    processElements(iterable, collector);
+
+    // Need to discard the old key's state
+    if (bagUserStateHandlerFactory != null) {
+      bagUserStateHandlerFactory.resetForNewKey();
+    }
+
+    // Used with Batch, we know that all the data is available for this key. 
We can't use the
+    // timer manager from the context because it doesn't exist. So we create 
one and advance
+    // time to the end after processing all elements.
+    final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+    ReceiverFactory receiverFactory =
+        new ReceiverFactory(
+            collector,
+            outputMap,
+            new TimerReceiverFactory(
+                stageBundleFactory,
+                executableStage.getTimers(),
+                
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(),
+                (WindowedValue timerElement, TimerInternals.TimerData 
timerData) -> {
+                  currentTimerKey = (((KV) timerElement.getValue()).getKey());
+                  timerInternals.setTimer(timerData);
+                },
+                windowCoder));
+
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+      processElements(iterable, bundle);
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+
+      fireEligibleTimers(
+          timerInternals,
+          (String timerId, WindowedValue timerValue) -> {
+            FnDataReceiver<WindowedValue<?>> fnTimerReceiver =
+                bundle.getInputReceivers().get(timerId);
+            Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver 
found for %s", timerId);
+            try {
+              fnTimerReceiver.accept(timerValue);
+            } catch (Exception e) {
+              throw new RuntimeException(
+                  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
+            }
+          });
+    }
   }
 
-  private void processElements(
-      Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
+  private void processElements(Iterable<WindowedValue<InputT>> iterable, 
RemoteBundle bundle)
       throws Exception {
-    checkState(
-        runtimeContext == getRuntimeContext(),
-        "RuntimeContext changed from under us. State handler invalid.");
-    checkState(
-        stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
-    checkState(
-        stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
+    Preconditions.checkArgument(bundle != null, "RemoteBundle must not be 
null");
+
+    String inputPCollectionId = executableStage.getInputPCollection().getId();
+    FnDataReceiver<WindowedValue<?>> mainReceiver =
+        Preconditions.checkNotNull(
+            bundle.getInputReceivers().get(inputPCollectionId),
+            "Main input receiver for %s could not be initialized",
+            inputPCollectionId);
+    for (WindowedValue<InputT> input : iterable) {
+      mainReceiver.accept(input);
+    }
+  }
 
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            new ReceiverFactory(collector, outputMap), stateRequestHandler, 
progressHandler)) {
-      // TODO(BEAM-4681): Add support to Flink to support portable timers.
-      FnDataReceiver<WindowedValue<?>> receiver =
-          Iterables.getOnlyElement(bundle.getInputReceivers().values());
-      for (WindowedValue<InputT> input : iterable) {
-        receiver.accept(input);
+  private void fireEligibleTimers(
 
 Review comment:
   Add method comment stating what this does (firing all timers until no more 
are scheduled). Would `drainTimers` be more appropriate?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 167267)
    Time Spent: 22h  (was: 21h 50m)

> Integrate support for timers using the portability APIs into Flink
> ------------------------------------------------------------------
>
>                 Key: BEAM-4681
>                 URL: https://issues.apache.org/jira/browse/BEAM-4681
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Luke Cwik
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: portability, portability-flink
>             Fix For: 2.9.0
>
>          Time Spent: 22h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to