yuchengxin commented on code in PR #28310:
URL: https://github.com/apache/flink/pull/28310#discussion_r3506102389
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -144,6 +149,50 @@ public TableFunctionResultFuture<RowData>
createFetcherResultFuture(Configuratio
return resultFuture;
}
+ @Override
+ public void timeout(RowData input, ResultFuture<RowData> resultFuture)
throws Exception {
+ // Find and discard the in-flight future bound to this input row so
that any late
+ // completion from the underlying fetcher is ignored. The generated
fetcher's own
+ // timeout method (rendered when the user UDF provides one) decides
how to complete
+ // the new outResultFuture below; if no user timeout method is
present, the default
+ // AsyncFunction.timeout raises a TimeoutException as before.
+ //
+ // Reference equality on leftRow is intentional: AsyncWaitOperator
passes the same
+ // RowData instance to both asyncInvoke and timeout for a given record
(the operator
+ // already deep-copies under object reuse).
+ JoinedRowResultFuture currentFuture = null;
+ for (JoinedRowResultFuture f : allResultFutures) {
+ if (f.leftRow == input) {
+ currentFuture = f;
+ break;
+ }
+ }
+ if (currentFuture == null || !currentFuture.inuse.compareAndSet(true,
false)) {
+ // current future is already completed and reused
+ return;
+ }
+ allResultFutures.remove(currentFuture);
+ currentFuture.close();
+
+ // Route through join pipeline via new JoinedRowResultFuture
+ JoinedRowResultFuture outResultFuture =
+ new JoinedRowResultFuture(
+ resultFutureBuffer,
+ createFetcherResultFuture(new Configuration()),
+ fetcherConverter,
+ isLeftOuterJoin,
+ rightRowSerializer.getArity());
+ outResultFuture.inuse.set(true);
+ outResultFuture.reset(input, resultFuture);
+ allResultFutures.add(outResultFuture);
Review Comment:
I fixed this by making the replacement future recycle on both the normal and
exceptional completion paths.
In `AsyncLookupJoinRunner`, the timeout replacement future is now returned
to `resultFutureBuffer` from a shared recycle path, so we no longer leak buffer
entries on exceptional timeout completion. I also added regression coverage to
ensure repeated timeout exceptions do not exhaust the buffer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]