Hi,

createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。
Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。

Best,
Jiabao


On 2023/12/12 07:53:53 李世钰 wrote:
> val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String]
>   // This instructs the sink to emit after every element, otherwise they would
>   // be buffered
>   .setBulkFlushMaxActions(1)
>   .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
>   .setEmitter(
>     (element: String, context: SinkWriter.Context, indexer: RequestIndexer) 
> =>
>       indexer.add(createIndexRequest(element)))
>   .build()
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: The elasticsearch emitter must be serializable. 
> 
> Caused by: java.lang.IllegalStateException: The elasticsearch emitter must be 
> serializable.
>         at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>         at 
> org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase.setEmitter(ElasticsearchSinkBuilderBase.java:77)
>         at 
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder.setEmitter(Elasticsearch7SinkBuilder.java:63)

回复