kennknowles commented on a change in pull request #12155:
URL: https://github.com/apache/beam/pull/12155#discussion_r450454617



##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1110,17 +1109,7 @@ public void updateWatermarks(
     pendingUpdates.offer(
         PendingWatermarkUpdate.create(
             executable, completed, timerUpdate, unprocessedInputs, outputs, 
earliestHold));
-    tryApplyPendingUpdates();
-  }
-
-  private void tryApplyPendingUpdates() {
-    if (refreshLock.tryLock()) {
-      try {
-        applyNUpdates(MAX_INCREMENTAL_UPDATES);
-      } finally {
-        refreshLock.unlock();
-      }
-    }

Review comment:
       Updates can only move watermarks forward. I think this is just a rate 
limiting effect. This is from #1287. I believe it is correct. Whenever a bundle 
is completed, it can allow the watermark to move forward - but not past other 
holds. In this way, adding updates does not need a shared lock with applying 
and removing updates.

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
##########
@@ -70,6 +71,8 @@ public static ExecutionDriver create(
   private final Map<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
       pendingRootBundles;
   private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>();
+  private final Map<AppliedPTransform<?, ?, ?>, 
Collection<CommittedBundle<?>>> inflightBundles =
+      new ConcurrentHashMap<>();

Review comment:
       The watermark should be held by all timestamps in the bundle, until the 
bundle is completed and committed.
   
   Can you move your GitHub review comment into a code comment that explains 
this field?

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -128,19 +108,6 @@ public void cleanup() throws Exception {
     final DoFn<KV<K, InputT>, OutputT> doFn = 
application.getTransform().getDoFn();
     final DoFnSignature signature = 
DoFnSignatures.getSignature(doFn.getClass());
 
-    // If the DoFn is stateful, schedule state clearing.
-    // It is semantically correct to schedule any number of redundant clear 
tasks; the
-    // cache is used to limit the number of tasks to avoid performance 
degradation.
-    if (signature.stateDeclarations().size() > 0) {
-      for (final WindowedValue<?> element : inputBundle.getElements()) {
-        for (final BoundedWindow window : element.getWindows()) {
-          cleanupRegistry.get(
-              AppliedPTransformOutputKeyAndWindow.create(
-                  application, (StructuralKey<K>) inputBundle.getKey(), 
window));
-        }
-      }
-    }
-

Review comment:
       Please do make it a separate commit. Because when reading code history 
it will make more sense. Also single commits can be rolled back. it is fine to 
have it in the same PR in my opinion.
   
   I agree that this seems redundant with 
https://github.com/apache/beam/blob/c3d1e5d71ada0097a48d235cb717c61f63b5eb80/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L113
   
   This code I think predates `StatefulDoFnRunner` so perhaps this was missed 
when it was ported to use `StatefulDoFnRunner`. (I could be wrong; I did not 
check the git history)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to