johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645005
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ########## @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) { @Override protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. - processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), - BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } /** - * Trigger times for current key. + * Trigger times if next timers exist. * When triggering, it emits the windowed data to downstream operators. - * @param key key - * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void triggerTimers(final K key, - final Watermark watermark, - final Instant processingTime, - final Instant synchronizedTime) { - final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) - inMemoryTimerInternalsFactory.timerInternalsForKey(key); - try { - timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp())); - timerInternals.advanceProcessingTime(processingTime); - timerInternals.advanceSynchronizedProcessingTime(synchronizedTime); - } catch (final Exception e) { - throw new RuntimeException(e); - } + private int triggerTimers(final Instant processingTime, + final Instant synchronizedTime) { + final ContextForTimer<K> context = inMemoryTimerInternalsFactory.context; + context.setCurrentInputWatermarkTime(new Instant(inputWatermark.getTimestamp())); Review comment: Maybe this lets us do without the `inputWatermark` field variable? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services