[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165554&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165554 ]
ASF GitHub Bot logged work on BEAM-4681: ---------------------------------------- Author: ASF GitHub Bot Created on: 13/Nov/18 18:05 Start Date: 13/Nov/18 18:05 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233159888 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ########## @@ -304,16 +342,141 @@ protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) { @Override protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner( DoFnRunner<InputT, OutputT> wrappedRunner) { - return new SdkHarnessDoFnRunner(); + sdkHarnessRunner = + new SdkHarnessDoFnRunner<>( + executableStage.getInputPCollection().getId(), + stageBundleFactory, + stateRequestHandler, + progressHandler, + outputManager, + outputMap, + executableStage.getTimers(), + (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(), + (WindowedValue<InputT> key, TimerInternals.TimerData timerData) -> { + try { + synchronized (getKeyedStateBackend()) { + setCurrentKey(keySelector.getKey(key)); + timerInternals.setTimer(timerData); + } + } catch (Exception e) { + throw new RuntimeException("Couldn't set key", e); + } + }, + () -> { + synchronized (getKeyedStateBackend()) { + ByteBuffer encodedKey = (ByteBuffer) getKeyedStateBackend().getCurrentKey(); + @SuppressWarnings("ByteBufferBackingArray") + ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); + try { + return keyCoder.decode(byteStream); + } catch (IOException e) { + throw new RuntimeException( + String.format( + Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); + } + } + }); + return sdkHarnessRunner; } - private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> { + @Override + public void processWatermark(Watermark mark) throws Exception { + // Due to the asynchronous communication with the SDK harness, + // a bundle might still be in progress and not all items have + // yet been received from the SDk harness. If we just set this + // watermark as the new output watermark, we could violate the + // order of the records, i.e. pending items in the SDK harness + // could become "late" although they were "on time". + // + // We can solve this problem using one of the following options: + // + // 1) Finish the current bundle and emit this watermark as the + // new output watermark. Finishing the bundle ensures that + // all the items have been processed by the SDK harness and + // received by the outputQueue (see below), where they will + // have been emitted to the output stream. + // + // 2) Put a hold on the output watermark for as long as the current + // bundle has not been finished. We have to remember to manually + // finish the bundle in case we receive the final watermark. + // To avoid latency, we should process this watermark again as + // soon as the current bundle is finished. + // + // Approach 1) is the easiest, yet 2) gives better throughput due + // to the bundle getting cut on every watermark. So we have + // implemented 2) below. + // + if (sdkHarnessRunner.isBundleInProgress()) { + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } else { + // It is not safe to advance the output watermark yet, so add a hold on the current + // output watermark. + setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold())); + sdkHarnessRunner.setBundleFinishedCallback( + () -> { + try { + processWatermark(mark); + } catch (Exception e) { + throw new RuntimeException( + "Failed to process pushed back watermark after finished bundle.", e); + } + }); + } + } + super.processWatermark(mark); + } + + private static class SdkHarnessDoFnRunner<InputT, OutputT> Review comment: I would agree with that if `SdkHarnessDoFnRunner` was a component with a prospect of being used independently. Instead it is an implementation detail of the operator that is tightly coupled. Ideally the class wouldn't even exist. I would probably go the opposite way and just delegate all methods to the operator, which would remove the clutter can keep the class super small. It could even be an anonymous class. Again, this is just for sake of discussion and not actionable for this PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 165554) Time Spent: 13h 10m (was: 13h) > Integrate support for timers using the portability APIs into Flink > ------------------------------------------------------------------ > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink > Reporter: Luke Cwik > Assignee: Maximilian Michels > Priority: Major > Labels: portability, portability-flink > Time Spent: 13h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)