kennknowles commented on a change in pull request #12155: URL: https://github.com/apache/beam/pull/12155#discussion_r450454617
########## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ########## @@ -1110,17 +1109,7 @@ public void updateWatermarks( pendingUpdates.offer( PendingWatermarkUpdate.create( executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold)); - tryApplyPendingUpdates(); - } - - private void tryApplyPendingUpdates() { - if (refreshLock.tryLock()) { - try { - applyNUpdates(MAX_INCREMENTAL_UPDATES); - } finally { - refreshLock.unlock(); - } - } Review comment: Updates can only move watermarks forward. I think this is just a rate limiting effect. This is from #1287. I believe it is correct. Whenever a bundle is completed, it can allow the watermark to move forward - but not past other holds. In this way, adding updates does not need a shared lock with applying and removing updates. ########## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java ########## @@ -70,6 +71,8 @@ public static ExecutionDriver create( private final Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles; private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>(); + private final Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> inflightBundles = + new ConcurrentHashMap<>(); Review comment: The watermark should be held by all timestamps in the bundle, until the bundle is completed and committed. Can you move your GitHub review comment into a code comment that explains this field? ########## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ########## @@ -128,19 +108,6 @@ public void cleanup() throws Exception { final DoFn<KV<K, InputT>, OutputT> doFn = application.getTransform().getDoFn(); final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - // If the DoFn is stateful, schedule state clearing. - // It is semantically correct to schedule any number of redundant clear tasks; the - // cache is used to limit the number of tasks to avoid performance degradation. - if (signature.stateDeclarations().size() > 0) { - for (final WindowedValue<?> element : inputBundle.getElements()) { - for (final BoundedWindow window : element.getWindows()) { - cleanupRegistry.get( - AppliedPTransformOutputKeyAndWindow.create( - application, (StructuralKey<K>) inputBundle.getKey(), window)); - } - } - } - Review comment: Please do make it a separate commit. Because when reading code history it will make more sense. Also single commits can be rolled back. it is fine to have it in the same PR in my opinion. I agree that this seems redundant with https://github.com/apache/beam/blob/c3d1e5d71ada0097a48d235cb717c61f63b5eb80/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L113 This code I think predates `StatefulDoFnRunner` so perhaps this was missed when it was ported to use `StatefulDoFnRunner`. (I could be wrong; I did not check the git history) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org