pltbkd commented on PR #28305:
URL: https://github.com/apache/flink/pull/28305#issuecomment-4627839836
> My AI says:
>
> 1. Race Condition in AsyncWaitOperator.outputCompletedElement()
>
> if (!downstreamAvailabilityProvider.isAvailable()) {
downstreamAvailabilityProvider.getAvailableFuture().thenRun(...) }
>
> Between checking isAvailable() and getting the future, state can change.
Should capture future first, then check isDone().
>
> 2. Potential Memory Leak
> Deferred callbacks via thenRun() may accumulate if mailbox rejects
execution. Silent failure in catch block doesn't clean up pending state.
> 3. Performance Concern
> CompositeAvailabilityProvider.getAvailableFuture() creates new combined
future on every call by iterating all providers - expensive for long chains.
>
> 📝 Code Quality Issues Inconsistent indentation in deferred callback
Missing comprehensive JavaDoc with usage examples Magic numbers in tests
(capacity=2) without explanation No null checks on availabilityHelper
initialization 🔄 Complexity Nested callback chain (thenRun →
mailboxExecutor.execute → outputCompletedElement) creates hard-to-debug control
flow. Consider state machine or explicit deferred queue.
>
> ⚙️ Compatibility & Side Effects Works with existing nodes: ✅ Yes - opt-in
via instanceof checks
>
> Side effects requiring documentation:
>
> Performance: Additional availability checks on hot path (every element)
Latency: Elements delayed when downstream unavailable Memory: Deferred
callbacks accumulate during prolonged unavailability Checkpoint timing: More
in-flight data may be captured Non-deterministic timing: Deferred emissions
maintain order but timing varies
>
> Migration concerns: Custom backpressure implementations may conflict
Monitoring/metrics need updates for deferred emissions Queue size tuning may
need adjustment
>
> Missing Documentation User guide: When/why soft backpressure activates
Operator implementation guide for SupportsSoftBackpressure Performance tuning
guide Migration impact guide Troubleshooting deferred emissions
I'd like to clarify some points, some of which may be because AI reviewers
sometimes miss certain Flink internal mechanisms, like mailbox.
1. In this case, outputCompletedElement is always called in the mailbox, so
there's no race condition issue: if downstream is available, the status won't
change; if the downstream is not available at that time, another check will be
done in the deferred mail, and status changes won't be an issue.
2. A `SupportsSoftBackpressure` operator only needs to respect its direct
downstream availability, but doesn't need its downstreams'. The complexity
doesn't increase along with the chain length but the forks, so in the majority
of cases an operator has only one downstream, and the composite falls back to a
single availability, which has an optimized path.
3. The inflight requests are managed by the element queue, including the
completed but not-yet-emitted results. So the memory leak will not happen. The
deferred callbacks are bounded by the queue capacity and are idempotent —
redundant mails simply re-check and no-op.
4. The `SupportsSoftBackpressure` is marked @Internal, we don't expect
external users to implement the interface. So I suppose the comment and example
of AsyncWaitOperator is clear enough and no user-visible documentations should
be added.
5. The deferred entries remain in the inflight queue and are checkpointed
along with other inflight requests — just like completed-but-waiting entries in
ordered mode. Only the original input elements are persisted; on recovery they
are replayed through asyncInvoke regardless of prior completion state. This
preserves exactly-once semantics.
IMO the only real concern is the performance issue, that adding an
availability check on the hot path, given that most cases don't need to build a
real composite availability. However, for async-fusion operators the bottleneck
is probably on the outside, so the cost is relatively low. On the other hand,
with the soft backpressure supported, not only unaligned checkpoint, but also
system notifications including canceling, failing over, checkpoint
notifications, etc., will be guaranteed executed in time, without the fear of
being blocked by downstream processing. I think the cost is worth paid. What do
you think?
Thanks again for the review. Could you please take another manual or AI
review with the explanations?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]