Hi,

如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法


资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,


Thanks


在 2022-07-12 12:35:31,"Bruce Zu" <zu.bruce.ch...@gmail.com> 写道:
> Flink team好,
> 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
>
> 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
>
>我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
>org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
>一旦不再使用它就需要调用它的`close`方法来释放资源。
>
>所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
>
>我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
>在 main 方法结束时释放资源。
>
>类似这样的伪代码:
>```java
>公共类 EsClientHolder {
>  private static final ThreadLocal<EsClient> local = new
>InheritableThreadLocal<>();
>
>  public static final void createAndSetEsClient(EsClient esClient){
>    local.set(esClient);
>  }
>
>  private static final createAndSetEsClientBy(EsClientConfig
>esClientConfig){
>    EsClient instance = new EsClient(esClientConfig);
>    createAndSetEsClient(instance)  ;
>  }
>
>   private static final   EsClient get() {
>    EsClient c = local.get();
>    if(c == null){
>      throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
>    }
>    return c;
>  }
>
>    private static final  close()抛出 IOException {
>    EsClient o = local.get();
>    if(o!= null){
>      o.close();
>    }
>  }
>
>// 在 Fink 应用程序代码中的用法
>   public class main class {
>    public static void main(String[] args) throws IOException {
>      try {
>        property prop = null;
>        EsClientConfig configuration = getEsClientConfig(prop);
>        EsClientHolder.createAndSetEsClientBy(config);
>       // …
>       SomeClass.method1();
>       other classes.method2();
>       // ...
>      } at last {
>        EsClientHolder.close();
>      }
>    }
>  }
>
>class SomeClass{
>   public void. method 1(){
>        // 1. Use EsClient in any calling method of any other class:
>        EsClient esClient = EsClientHolder.get();
>       // …
>   }
>}
>class other class {
>  public void method 2() {
>      // 2. Use EsClient in any calling method of any forked child thread
>        new thread (
>                () -> {
>                  EsClient client = EsClientHolder.get();
>                  // …
>                })
>            . start();
>         // …
>  }
>}
>
>```
>
>我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
>
>但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
>
>比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
>的线程不一样的线程,
>那么运行method1和mehod2的线程就没有办法拿到EsClient了。
>这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
>没有办法释放资源。
>
>谢谢!

回复