taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247665
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 ##########
 @@ -132,25 +150,46 @@ public final DoFn getDoFn() {
    * Checks whether the bundle is finished or not.
    * Starts the bundle if it is done.
    */
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
     if (bundleFinished) {
       bundleFinished = false;
-      doFnRunner.startBundle();
+      if (pushBackRunner == null) {
+        doFnRunner.startBundle();
+      } else {
+        pushBackRunner.startBundle();
+      }
       prevBundleStartTime = System.currentTimeMillis();
       currBundleCount = 0;
     }
     currBundleCount += 1;
   }
 
-
   /**
    * Checks whether it is time to finish the bundle and finish it.
    */
-  protected final void checkAndFinishBundle() {
+  final void checkAndFinishBundle() {
     if (!bundleFinished) {
       if (currBundleCount >= bundleSize || System.currentTimeMillis() - 
prevBundleStartTime >= bundleMillis) {
         bundleFinished = true;
+        if (pushBackRunner == null) {
+          doFnRunner.finishBundle();
+        } else {
+          pushBackRunner.finishBundle();
+        }
+      }
+    }
+  }
+
+  /**
+   * Finish bundle without checking for conditions.
+   */
+  final void forceFinishBundle() {
 
 Review comment:
   This is only used for PushBackRunner.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to