johnyangk commented on a change in pull request #178: [NEMO-317] Optimize
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645005
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
##########
@@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) {
@Override
protected void beforeClose() {
// Finish any pending windows by advancing the input watermark to infinity.
- processElementsAndTriggerTimers(new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
- BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
}
/**
- * Trigger times for current key.
+ * Trigger times if next timers exist.
* When triggering, it emits the windowed data to downstream operators.
- * @param key key
- * @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
- private void triggerTimers(final K key,
- final Watermark watermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
- final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
- inMemoryTimerInternalsFactory.timerInternalsForKey(key);
- try {
- timerInternals.advanceInputWatermark(new
Instant(watermark.getTimestamp()));
- timerInternals.advanceProcessingTime(processingTime);
- timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
+ private int triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime) {
+ final ContextForTimer<K> context = inMemoryTimerInternalsFactory.context;
+ context.setCurrentInputWatermarkTime(new
Instant(inputWatermark.getTimestamp()));
Review comment:
Maybe this lets us do without the `inputWatermark` field variable?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services