[ https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282 ]
Guowei Ma edited comment on FLINK-12086 at 4/3/19 1:30 AM: ----------------------------------------------------------- I think there might be another way to access user object other than change the API. {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction<Integer, String> { private static final long serialVersionUID = 1L; private HashMap<ResultFuture<String>, Future<String>> libFutures = new HashMap(); @Override public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws Exception { Future<String> future = null; // submit something in a library thread-pool libFutures.put(resultFuture, future); CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); libFutures.remove(resultFuture); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, ResultFuture<String> resultFuture) throws Exception { Future<String> future = libFutures.remove(resultFuture); if (future != null) { future.cancel(true); } resultFuture.complete(Collections.emptySet()); } } {code} was (Author: maguowei): I think there might be another way to access user object other than change the API. {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction<Integer, String> { private static final long serialVersionUID = 1L; private HashMap<ResultFuture<String>, Future<String>> libFutures = new HashMap(); @Override public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws Exception { Future<String> future = null; // submit something in a library thread-pool libFutures.put(resultFuture, future); CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); libFutures.remove(resultFuture); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, ResultFuture<String> resultFuture) throws Exception { Future<String> future = libFutures.remove(resultFuture); if (future != null) { future.cancel(true); } resultFuture.complete(Collections.emptySet()); } } {code} > AsyncFunction - Add access to a user defined Object for cleanup on timeout > -------------------------------------------------------------------------- > > Key: FLINK-12086 > URL: https://issues.apache.org/jira/browse/FLINK-12086 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: Felix Wollschläger > Priority: Major > > When executing async-requests it would be nice to have access to a user > defined object to perform cleanup when the process times out. > For example, when executing Cassandra-Queries I'm using the drivers > threadpool to submit Statements, which returns a > com.datastax.driver.core.ResultSetFutre ( > [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html] > ). When I run into a timeout I could cancel the Future because waiting for > it to complete is unnecessary in that case. > > The API could be extendend to something like this: > > Adding an Type-Parameter to the AsnyFunction Interface: > {code:java} > AsyncFunction<IN, OUT, T>{code} > Updating the asnyInvoke-Method to return the user-defined object: > {code:java} > T asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception;{code} > Updating the timeout-Method to accept the user-defined object: > {code:java} > void timeout(IN input, T obj, ResultFuture<OUT> resultFuture) throws > Exception{code} > > An example Implementation could look like this: > {code:java} > package dev.codeflush; > import org.apache.flink.streaming.api.functions.async.AsyncFunction; > import org.apache.flink.streaming.api.functions.async.ResultFuture; > import java.util.Collections; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.Future; > public class SomeAsyncFunction implements AsyncFunction<Integer, String, > Future<String>> { > private static final long serialVersionUID = 1L; > > @Override > public Future<String> asyncInvoke(Integer input, ResultFuture<String> > resultFuture) throws Exception { > Future<String> future = null; // submit something in a library > thread-pool > CompletableFuture.runAsync(() -> { > try { > resultFuture.complete(Collections.singleton(future.get())); > } catch (ExecutionException e) { > // handle this > } catch (InterruptedException e) { > // handle that > } > }); > > return future; > } > @Override > public void timeout(Integer input, Future<String> future, > ResultFuture<String> resultFuture) throws Exception { > future.cancel(true); > resultFuture.complete(Collections.emptySet()); > } > } > {code} > As it currently is, submitted tasks in the asnyInvoke-Method will use > resources (Threads, IO) even if the application is no longer in a state where > it could do something meaningful with the result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)