[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=164362&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164362 ]
ASF GitHub Bot logged work on BEAM-4681: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Nov/18 14:27 Start Date: 09/Nov/18 14:27 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232267191 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ########## @@ -304,16 +342,141 @@ protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) { @Override protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner( DoFnRunner<InputT, OutputT> wrappedRunner) { - return new SdkHarnessDoFnRunner(); + sdkHarnessRunner = + new SdkHarnessDoFnRunner<>( + executableStage.getInputPCollection().getId(), + stageBundleFactory, + stateRequestHandler, + progressHandler, + outputManager, + outputMap, + executableStage.getTimers(), + (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(), + (WindowedValue<InputT> key, TimerInternals.TimerData timerData) -> { + try { + synchronized (getKeyedStateBackend()) { + setCurrentKey(keySelector.getKey(key)); + timerInternals.setTimer(timerData); + } + } catch (Exception e) { + throw new RuntimeException("Couldn't set key", e); + } + }, + () -> { + synchronized (getKeyedStateBackend()) { + ByteBuffer encodedKey = (ByteBuffer) getKeyedStateBackend().getCurrentKey(); + @SuppressWarnings("ByteBufferBackingArray") + ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); + try { + return keyCoder.decode(byteStream); + } catch (IOException e) { + throw new RuntimeException( + String.format( + Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); + } + } + }); + return sdkHarnessRunner; } - private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> { + @Override + public void processWatermark(Watermark mark) throws Exception { + // Due to the asynchronous communication with the SDK harness, + // a bundle might still be in progress and not all items have + // yet been received from the SDk harness. If we just set this + // watermark as the new output watermark, we could violate the + // order of the records, i.e. pending items in the SDK harness + // could become "late" although they were "on time". + // + // We can solve this problem using one of the following options: + // + // 1) Finish the current bundle and emit this watermark as the + // new output watermark. Finishing the bundle ensures that + // all the items have been processed by the SDK harness and + // received by the outputQueue (see below), where they will + // have been emitted to the output stream. + // + // 2) Put a hold on the output watermark for as long as the current + // bundle has not been finished. We have to remember to manually + // finish the bundle in case we receive the final watermark. + // To avoid latency, we should process this watermark again as + // soon as the current bundle is finished. + // + // Approach 1) is the easiest, yet 2) gives better throughput due + // to the bundle getting cut on every watermark. So we have + // implemented 2) below. + // + if (sdkHarnessRunner.isBundleInProgress()) { + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } else { + // It is not safe to advance the output watermark yet, so add a hold on the current + // output watermark. + setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold())); + sdkHarnessRunner.setBundleFinishedCallback( + () -> { + try { + processWatermark(mark); + } catch (Exception e) { + throw new RuntimeException( + "Failed to process pushed back watermark after finished bundle.", e); + } + }); + } + } + super.processWatermark(mark); + } + + private static class SdkHarnessDoFnRunner<InputT, OutputT> Review comment: Wanted to isolate the harness runner from the rest of the operator and provided only the required dependencies. Yes, it is a bit of clutter but it is also easier to understand what this class does and what it has access to. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 164362) Time Spent: 2h 50m (was: 2h 40m) > Integrate support for timers using the portability APIs into Flink > ------------------------------------------------------------------ > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink > Reporter: Luke Cwik > Assignee: Maximilian Michels > Priority: Major > Labels: portability, portability-flink > Time Spent: 2h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)