Hi, createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。 Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。
Best, Jiabao On 2023/12/12 07:53:53 李世钰 wrote: > val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String] > // This instructs the sink to emit after every element, otherwise they would > // be buffered > .setBulkFlushMaxActions(1) > .setHosts(new HttpHost("127.0.0.1", 9200, "http")) > .setEmitter( > (element: String, context: SinkWriter.Context, indexer: RequestIndexer) > => > indexer.add(createIndexRequest(element))) > .build() > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: The elasticsearch emitter must be serializable. > > Caused by: java.lang.IllegalStateException: The elasticsearch emitter must be > serializable. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > at > org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase.setEmitter(ElasticsearchSinkBuilderBase.java:77) > at > org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder.setEmitter(Elasticsearch7SinkBuilder.java:63)