pltbkd commented on PR #28305:
URL: https://github.com/apache/flink/pull/28305#issuecomment-4627839836

   > My AI says:
   > 
   > 1. Race Condition in AsyncWaitOperator.outputCompletedElement()
   > 
   > if (!downstreamAvailabilityProvider.isAvailable()) { 
downstreamAvailabilityProvider.getAvailableFuture().thenRun(...) }
   > 
   > Between checking isAvailable() and getting the future, state can change. 
Should capture future first, then check isDone().
   > 
   > 2. Potential Memory Leak
   >    Deferred callbacks via thenRun() may accumulate if mailbox rejects 
execution. Silent failure in catch block doesn't clean up pending state.
   > 3. Performance Concern
   >    CompositeAvailabilityProvider.getAvailableFuture() creates new combined 
future on every call by iterating all providers - expensive for long chains.
   > 
   > 📝 Code Quality Issues Inconsistent indentation in deferred callback 
Missing comprehensive JavaDoc with usage examples Magic numbers in tests 
(capacity=2) without explanation No null checks on availabilityHelper 
initialization 🔄 Complexity Nested callback chain (thenRun → 
mailboxExecutor.execute → outputCompletedElement) creates hard-to-debug control 
flow. Consider state machine or explicit deferred queue.
   > 
   > ⚙️ Compatibility & Side Effects Works with existing nodes: ✅ Yes - opt-in 
via instanceof checks
   > 
   > Side effects requiring documentation:
   > 
   > Performance: Additional availability checks on hot path (every element) 
Latency: Elements delayed when downstream unavailable Memory: Deferred 
callbacks accumulate during prolonged unavailability Checkpoint timing: More 
in-flight data may be captured Non-deterministic timing: Deferred emissions 
maintain order but timing varies
   > 
   > Migration concerns: Custom backpressure implementations may conflict 
Monitoring/metrics need updates for deferred emissions Queue size tuning may 
need adjustment
   > 
   > Missing Documentation User guide: When/why soft backpressure activates 
Operator implementation guide for SupportsSoftBackpressure Performance tuning 
guide Migration impact guide Troubleshooting deferred emissions
   
   I'd like to clarify some points, some of which may be because AI reviewers 
sometimes miss certain Flink internal mechanisms, like mailbox.
   
   1. In this case, outputCompletedElement is always called in the mailbox, so 
there's no race condition issue: if downstream is available, the status won't 
change; if the downstream is not available at that time, another check will be 
done in the deferred mail, and status changes won't be an issue.
   2. A `SupportsSoftBackpressure` operator only needs to respect its direct 
downstream availability, but doesn't need its downstreams'. The complexity 
doesn't increase along with the chain length but the forks, so in the majority 
of cases an operator has only one downstream, and the composite falls back to a 
single availability, which has an optimized path.
   3. The inflight requests are managed by the element queue, including the 
completed but not-yet-emitted results. So the memory leak will not happen. The 
deferred callbacks are bounded by the queue capacity and are idempotent — 
redundant mails simply re-check and no-op.
   4. The `SupportsSoftBackpressure` is marked @Internal, we don't expect 
external users to implement the interface. So I suppose the comment and example 
of AsyncWaitOperator is clear enough and no user-visible documentations should 
be added.
   5. The deferred entries remain in the inflight queue and are checkpointed 
along with other inflight requests — just like completed-but-waiting entries in 
ordered mode. Only the original input elements are persisted; on recovery they 
are replayed through asyncInvoke regardless of prior completion state. This 
preserves exactly-once semantics.
   
   IMO the only real concern is the performance issue, that adding an 
availability check on the hot path, given that most cases don't need to build a 
real composite availability. However, for async-fusion operators the bottleneck 
is probably on the outside, so the cost is relatively low. On the other hand, 
with the soft backpressure supported, not only unaligned checkpoint, but also 
system notifications including canceling, failing over, checkpoint 
notifications, etc., will be guaranteed executed in time, without the fear of 
being blocked by downstream processing. I think the cost is worth paid. What do 
you think?
   
   Thanks again for the review. Could you please take another manual or AI 
review with the explanations?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to