johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645416
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ########## @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) { @Override protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. - processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), - BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } /** - * Trigger times for current key. + * Trigger times if next timers exist. * When triggering, it emits the windowed data to downstream operators. - * @param key key - * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void triggerTimers(final K key, - final Watermark watermark, - final Instant processingTime, - final Instant synchronizedTime) { - final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) - inMemoryTimerInternalsFactory.timerInternalsForKey(key); - try { - timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp())); - timerInternals.advanceProcessingTime(processingTime); - timerInternals.advanceSynchronizedProcessingTime(synchronizedTime); - } catch (final Exception e) { - throw new RuntimeException(e); - } + private int triggerTimers(final Instant processingTime, + final Instant synchronizedTime) { + final ContextForTimer<K> context = inMemoryTimerInternalsFactory.context; + context.setCurrentInputWatermarkTime(new Instant(inputWatermark.getTimestamp())); + context.setCurrentProcessingTime(processingTime); + context.setCurrentSynchronizedProcessingTime(synchronizedTime); - final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals); + // get next eligible timers + final List<Pair<K, TimerInternals.TimerData>> timers = getEligibleTimers(); - if (!timerDataList.isEmpty()) { + for (final Pair<K, TimerInternals.TimerData> timer : timers) { // Trigger timers and emit windowed data final KeyedWorkItem<K, InputT> timerWorkItem = - KeyedWorkItems.timersWorkItem(key, timerDataList); + KeyedWorkItems.timersWorkItem(timer.left(), Collections.singletonList(timer.right())); Review comment: Minor question: Would it be worthwhile to group timers by keys first and then invoke `processElement` to minimize the number of invocations? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services