AlanConfluent commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2216591596


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/DelegatingAsyncTableResultFuture.java:
##########
@@ -29,27 +33,46 @@
  * is used as a bridge between {@link 
org.apache.flink.table.functions.AsyncTableFunction} and
  * {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}.
  */
-public class DelegatingResultFuture<OUT> implements 
BiConsumer<Collection<OUT>, Throwable> {
+public class DelegatingAsyncTableResultFuture implements 
BiConsumer<Collection<Object>, Throwable> {
 
-    private final ResultFuture<OUT> delegatedResultFuture;
-    private final CompletableFuture<Collection<OUT>> completableFuture;
+    private final ResultFuture<Object> delegatedResultFuture;
+    private final boolean needsWrapping;
+    private final boolean isInternalResultType;
 
-    public DelegatingResultFuture(ResultFuture<OUT> delegatedResultFuture) {
+    private final CompletableFuture<Collection<Object>> completableFuture;
+
+    public DelegatingAsyncTableResultFuture(
+            ResultFuture<Object> delegatedResultFuture,
+            boolean needsWrapping,
+            boolean isInternalResultType) {
         this.delegatedResultFuture = delegatedResultFuture;
+        this.needsWrapping = needsWrapping;
+        this.isInternalResultType = isInternalResultType;
         this.completableFuture = new CompletableFuture<>();
         this.completableFuture.whenComplete(this);
     }
 
     @Override
-    public void accept(Collection<OUT> outs, Throwable throwable) {
+    public void accept(Collection<Object> outs, Throwable throwable) {
         if (throwable != null) {
             delegatedResultFuture.completeExceptionally(throwable);
         } else {
+            if (needsWrapping) {

Review Comment:
   I was debating how to do this nicely.  I could generate this class or even a 
Runnable to pass in.  What I did, which is equivalent roughly to the latter, is 
that in the constructor I cache a wrapFunction, which points to either no wrap, 
wrapInternal, or wrapExternal, so it has no branching on the critical path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to