[ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165554&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165554
 ]

ASF GitHub Bot logged work on BEAM-4681:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Nov/18 18:05
            Start Date: 13/Nov/18 18:05
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233159888
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##########
 @@ -304,16 +342,141 @@ protected void 
addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
   @Override
   protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
       DoFnRunner<InputT, OutputT> wrappedRunner) {
-    return new SdkHarnessDoFnRunner();
+    sdkHarnessRunner =
+        new SdkHarnessDoFnRunner<>(
+            executableStage.getInputPCollection().getId(),
+            stageBundleFactory,
+            stateRequestHandler,
+            progressHandler,
+            outputManager,
+            outputMap,
+            executableStage.getTimers(),
+            (Coder<BoundedWindow>) 
windowingStrategy.getWindowFn().windowCoder(),
+            (WindowedValue<InputT> key, TimerInternals.TimerData timerData) -> 
{
+              try {
+                synchronized (getKeyedStateBackend()) {
+                  setCurrentKey(keySelector.getKey(key));
+                  timerInternals.setTimer(timerData);
+                }
+              } catch (Exception e) {
+                throw new RuntimeException("Couldn't set key", e);
+              }
+            },
+            () -> {
+              synchronized (getKeyedStateBackend()) {
+                ByteBuffer encodedKey = (ByteBuffer) 
getKeyedStateBackend().getCurrentKey();
+                @SuppressWarnings("ByteBufferBackingArray")
+                ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+                try {
+                  return keyCoder.decode(byteStream);
+                } catch (IOException e) {
+                  throw new RuntimeException(
+                      String.format(
+                          Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
+                }
+              }
+            });
+    return sdkHarnessRunner;
   }
 
-  private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> {
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    // Due to the asynchronous communication with the SDK harness,
+    // a bundle might still be in progress and not all items have
+    // yet been received from the SDk harness. If we just set this
+    // watermark as the new output watermark, we could violate the
+    // order of the records, i.e. pending items in the SDK harness
+    // could become "late" although they were "on time".
+    //
+    // We can solve this problem using one of the following options:
+    //
+    // 1) Finish the current bundle and emit this watermark as the
+    //    new output watermark. Finishing the bundle ensures that
+    //    all the items have been processed by the SDK harness and
+    //    received by the outputQueue (see below), where they will
+    //    have been emitted to the output stream.
+    //
+    // 2) Put a hold on the output watermark for as long as the current
+    //    bundle has not been finished. We have to remember to manually
+    //    finish the bundle in case we receive the final watermark.
+    //    To avoid latency, we should process this watermark again as
+    //    soon as the current bundle is finished.
+    //
+    // Approach 1) is the easiest, yet 2) gives better throughput due
+    // to the bundle getting cut on every watermark. So we have
+    // implemented 2) below.
+    //
+    if (sdkHarnessRunner.isBundleInProgress()) {
+      if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        invokeFinishBundle();
+      } else {
+        // It is not safe to advance the output watermark yet, so add a hold 
on the current
+        // output watermark.
+        setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+        sdkHarnessRunner.setBundleFinishedCallback(
+            () -> {
+              try {
+                processWatermark(mark);
+              } catch (Exception e) {
+                throw new RuntimeException(
+                    "Failed to process pushed back watermark after finished 
bundle.", e);
+              }
+            });
+      }
+    }
+    super.processWatermark(mark);
+  }
+
+  private static class SdkHarnessDoFnRunner<InputT, OutputT>
 
 Review comment:
   I would agree with that if `SdkHarnessDoFnRunner` was a component with a 
prospect of being used independently. Instead it is an implementation detail of 
the operator that is tightly coupled. Ideally the class wouldn't even exist. I 
would probably go the opposite way and just delegate all methods to the 
operator, which would remove the clutter can keep the class super small. It 
could even be an anonymous class.  Again, this is just for sake of discussion 
and not actionable for this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165554)
    Time Spent: 13h 10m  (was: 13h)

> Integrate support for timers using the portability APIs into Flink
> ------------------------------------------------------------------
>
>                 Key: BEAM-4681
>                 URL: https://issues.apache.org/jira/browse/BEAM-4681
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Luke Cwik
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: portability, portability-flink
>          Time Spent: 13h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to