taegeonum commented on a change in pull request #135: [NEMO-230] Emit collected 
data when receiving watermark in GroupByKeyAndWindowTransform
URL: https://github.com/apache/incubator-nemo/pull/135#discussion_r229185324
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##########
 @@ -108,33 +138,56 @@ public void onWatermark(final Watermark watermark) {
    */
   @Override
   protected void beforeClose() {
-    final InMemoryTimerInternals timerInternals = 
timerInternalsFactory.timerInternals;
-    try {
-      // Finish any pending windows by advancing the input watermark to 
infinity.
-      timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Finish any pending windows by advancing the input watermark to infinity.
+    processElementsAndTriggerTimers(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
+      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
 
-      // Finally, advance the processing time to infinity to fire any timers.
-      timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-      
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  /**
+   * Trigger times for current key.
+   * When triggering, it emits the windowed data to downstream operators.
+   * @param key key
+   * @param watermark watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void triggerTimers(final K key,
+                             final Watermark watermark,
+                             final Instant processingTime,
+                             final Instant synchronizedTime) {
+    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+      
stateAndTimerInternalsFactory.inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+    try {
+      timerInternals.advanceInputWatermark(new 
Instant(watermark.getTimestamp()));
+      timerInternals.advanceProcessingTime(processingTime);
+      timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
 
-    if (keyToValues.isEmpty()) {
-      LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
-    } else {
-      // timer
-      final Iterable<TimerInternals.TimerData> timerData = 
getTimers(timerInternals);
+    final List<TimerInternals.TimerData> timerDataList = 
getTimers(timerInternals);
 
-      keyToValues.entrySet().stream().forEach(entry -> {
-        // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
-        // so we convert the KV to KeyedWorkItem
-        final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
-        
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
-      });
-      keyToValues.clear();
+    if (timerDataList.isEmpty()) {
+      return;
+    }
+
+    // Trigger timers and emit windowed data
+    final KeyedWorkItem<K, InputT> timerWorkItem =
+      KeyedWorkItems.timersWorkItem(key, timerDataList);
+    
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
 
 Review comment:
   Done 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to