不是的哈,那个方法本身还是同步调用的。就是需要你自己保证逻辑的异步。

Jacob <17691150...@163.com> 于2021年6月8日周二 上午9:31写道:
>
> @nobleyd
> 谢谢大神指导,前两天休息没看邮件,才回复,抱歉
>
> 我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常
>
> 我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。
>
> public class AsyncProcessFunction extends RichAsyncFunction<Map&lt;String,
> String>, List<JSONObject>> {
>
>      private transient ExecutorService executorpool;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>       executorpool= Executors.newFixedThreadPool(80);
>     }
>
>     @Override
>     public void asyncInvoke(Map<String, String> message,
> ResultFuture<List&lt;JSONObject>> resultFuture){
>         executorpool.submit(()->{
>                   // 处理逻辑
>                       ..............
>               resultFuture.complete(Collections.singletonList(...));
>         });
>     }
> }
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply via email to