[ 
https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808282#comment-16808282
 ] 

Guowei Ma edited comment on FLINK-12086 at 4/3/19 1:30 AM:
-----------------------------------------------------------

I think there might be another way to access user object other than change the 
API.
{code:java}
 package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction<Integer, String> {

    private static final long serialVersionUID = 1L;
    
    private HashMap<ResultFuture<String>, Future<String>> libFutures = new 
HashMap();

    @Override
    public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) 
throws Exception {
        Future<String> future = null; // submit something in a library 
thread-pool 
        libFutures.put(resultFuture, future);

        CompletableFuture.runAsync(() -> {
            try {

                resultFuture.complete(Collections.singleton(future.get()));
                libFutures.remove(resultFuture);
            } catch (ExecutionException e) {
                // handle this
            } catch (InterruptedException e) {
                // handle that
            }
        });
        
        return future;
    }

    @Override
    public void timeout(Integer input, ResultFuture<String> resultFuture) 
throws Exception {
        Future<String> future = libFutures.remove(resultFuture);
        if (future != null) {
                 future.cancel(true);
        }
        resultFuture.complete(Collections.emptySet());
    }
}
{code}
 

 


was (Author: maguowei):
I think there might be another way to access user object other than change the 
API.
{code:java}
 package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction<Integer, String> {

    private static final long serialVersionUID = 1L;
    
    private HashMap<ResultFuture<String>, Future<String>> libFutures = new 
HashMap();

    @Override
    public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) 
throws Exception {
        Future<String> future = null; // submit something in a library 
thread-pool 
        libFutures.put(resultFuture, future);

        CompletableFuture.runAsync(() -> {
            try {

                resultFuture.complete(Collections.singleton(future.get()));
                libFutures.remove(resultFuture);
            } catch (ExecutionException e) {
                // handle this
            } catch (InterruptedException e) {
                // handle that
            }
        });
        
        return future;
    }

    @Override
    public void timeout(Integer input, ResultFuture<String> resultFuture) 
throws Exception {
        Future<String> future = libFutures.remove(resultFuture);
        if (future != null) {
                 future.cancel(true);
        }
        resultFuture.complete(Collections.emptySet());
    }
}
{code}
 

 

> AsyncFunction - Add access to a user defined Object for cleanup on timeout
> --------------------------------------------------------------------------
>
>                 Key: FLINK-12086
>                 URL: https://issues.apache.org/jira/browse/FLINK-12086
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Felix Wollschläger
>            Priority: Major
>
> When executing async-requests it would be nice to have access to a user 
> defined object to perform cleanup when the process times out.
> For example, when executing Cassandra-Queries I'm using the drivers 
> threadpool to submit Statements, which returns a 
> com.datastax.driver.core.ResultSetFutre ( 
> [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html]
>  ). When I run into a timeout I could cancel the Future because waiting for 
> it to complete is unnecessary in that case.
>  
> The API could be extendend to something like this:
>  
> Adding an Type-Parameter to the AsnyFunction Interface:
> {code:java}
> AsyncFunction<IN, OUT, T>{code}
> Updating the asnyInvoke-Method to return the user-defined object:
> {code:java}
> T asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception;{code}
> Updating the timeout-Method to accept the user-defined object:
> {code:java}
> void timeout(IN input, T obj, ResultFuture<OUT> resultFuture) throws 
> Exception{code}
>  
> An example Implementation could look like this:
> {code:java}
> package dev.codeflush;
> import org.apache.flink.streaming.api.functions.async.AsyncFunction;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import java.util.Collections;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.Future;
> public class SomeAsyncFunction implements AsyncFunction<Integer, String, 
> Future<String>> {
>     private static final long serialVersionUID = 1L;
>     
>     @Override
>     public Future<String> asyncInvoke(Integer input, ResultFuture<String> 
> resultFuture) throws Exception {
>         Future<String> future = null; // submit something in a library 
> thread-pool 
>         CompletableFuture.runAsync(() -> {
>             try {
>                 resultFuture.complete(Collections.singleton(future.get()));
>             } catch (ExecutionException e) {
>                 // handle this
>             } catch (InterruptedException e) {
>                 // handle that
>             }
>         });
>         
>         return future;
>     }
>     @Override
>     public void timeout(Integer input, Future<String> future, 
> ResultFuture<String> resultFuture) throws Exception {
>         future.cancel(true);
>         resultFuture.complete(Collections.emptySet());
>     }
> }
> {code}
> As it currently is, submitted tasks in the asnyInvoke-Method will use 
> resources (Threads, IO) even if the application is no longer in a state where 
> it could do something meaningful with the result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to