johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645005
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##########
 @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) {
   @Override
   protected void beforeClose() {
     // Finish any pending windows by advancing the input watermark to infinity.
-    processElementsAndTriggerTimers(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
-      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    inputWatermark = new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
   /**
-   * Trigger times for current key.
+   * Trigger times if next timers exist.
    * When triggering, it emits the windowed data to downstream operators.
-   * @param key key
-   * @param watermark watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
    */
-  private void triggerTimers(final K key,
-                             final Watermark watermark,
-                             final Instant processingTime,
-                             final Instant synchronizedTime) {
-    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
-      inMemoryTimerInternalsFactory.timerInternalsForKey(key);
-    try {
-      timerInternals.advanceInputWatermark(new 
Instant(watermark.getTimestamp()));
-      timerInternals.advanceProcessingTime(processingTime);
-      timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
+  private int triggerTimers(final Instant processingTime,
+                            final Instant synchronizedTime) {
+    final ContextForTimer<K> context = inMemoryTimerInternalsFactory.context;
+    context.setCurrentInputWatermarkTime(new 
Instant(inputWatermark.getTimestamp()));
 
 Review comment:
   Maybe this lets us do without the `inputWatermark` field variable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to