[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167267 ]
ASF GitHub Bot logged work on BEAM-4681: ---------------------------------------- Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234463725 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ########## @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler( public void mapPartition( Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception { - processElements(iterable, collector); + + ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap); + try (RemoteBundle bundle = + stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); + } } - /** For stateful processing via a GroupReduceFunction. */ + /** For stateful and timer processing via a GroupReduceFunction. */ @Override public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception { - bagUserStateHandlerFactory.resetForNewKey(); - processElements(iterable, collector); + + // Need to discard the old key's state + if (bagUserStateHandlerFactory != null) { + bagUserStateHandlerFactory.resetForNewKey(); + } + + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and advance + // time to the end after processing all elements. + final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(Instant.now()); + timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + + ReceiverFactory receiverFactory = + new ReceiverFactory( + collector, + outputMap, + new TimerReceiverFactory( + stageBundleFactory, + executableStage.getTimers(), + stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(), + (WindowedValue timerElement, TimerInternals.TimerData timerData) -> { + currentTimerKey = (((KV) timerElement.getValue()).getKey()); + timerInternals.setTimer(timerData); + }, + windowCoder)); + + try (RemoteBundle bundle = + stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); + } + + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + + try (RemoteBundle bundle = + stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + + fireEligibleTimers( + timerInternals, + (String timerId, WindowedValue timerValue) -> { + FnDataReceiver<WindowedValue<?>> fnTimerReceiver = + bundle.getInputReceivers().get(timerId); + Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver found for %s", timerId); + try { + fnTimerReceiver.accept(timerValue); + } catch (Exception e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); + } + }); + } } - private void processElements( - Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) + private void processElements(Iterable<WindowedValue<InputT>> iterable, RemoteBundle bundle) throws Exception { - checkState( - runtimeContext == getRuntimeContext(), - "RuntimeContext changed from under us. State handler invalid."); - checkState( - stageBundleFactory != null, "%s not yet prepared", StageBundleFactory.class.getName()); - checkState( - stateRequestHandler != null, "%s not yet prepared", StateRequestHandler.class.getName()); + Preconditions.checkArgument(bundle != null, "RemoteBundle must not be null"); + + String inputPCollectionId = executableStage.getInputPCollection().getId(); + FnDataReceiver<WindowedValue<?>> mainReceiver = + Preconditions.checkNotNull( + bundle.getInputReceivers().get(inputPCollectionId), + "Main input receiver for %s could not be initialized", + inputPCollectionId); + for (WindowedValue<InputT> input : iterable) { + mainReceiver.accept(input); + } + } - try (RemoteBundle bundle = - stageBundleFactory.getBundle( - new ReceiverFactory(collector, outputMap), stateRequestHandler, progressHandler)) { - // TODO(BEAM-4681): Add support to Flink to support portable timers. - FnDataReceiver<WindowedValue<?>> receiver = - Iterables.getOnlyElement(bundle.getInputReceivers().values()); - for (WindowedValue<InputT> input : iterable) { - receiver.accept(input); + private void fireEligibleTimers( Review comment: Add method comment stating what this does (firing all timers until no more are scheduled). Would `drainTimers` be more appropriate? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 167267) Time Spent: 22h (was: 21h 50m) > Integrate support for timers using the portability APIs into Flink > ------------------------------------------------------------------ > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink > Reporter: Luke Cwik > Assignee: Maximilian Michels > Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)