Hi,
ResultIndexRequestBuilder is a non-static inner class. This means it has a 
pointer to the enclosing instance. If you make it a static inner class your 
code should work.

Best,
Aljoscha
> On 28. Apr 2017, at 04:57, Vijay Srinivasaraghavan <vijikar...@yahoo.com> 
> wrote:
> 
> Hello,
> 
> I am seeing below error when I try to use ElasticsearchSink. It complains 
> about serialization and looks like it is leading to "IndexRequestBuilder" 
> implementation. I have tried the suggestion as mentioned in 
> http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability 
> <http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability>
>  (changed from anonymous class to concrete class) but it did not help. 
> However, when I call "ElasticsearchSink<>(config, transports, null)" by 
> passing "null" for "IndexRequestBuilder" then I don't see the serialization 
> error. This suggests the problem could be with the IndexRequestBuilder 
> implementation but I am not able to move further.
> 
> Could someone please let me know what's the right way to use 
> ElasticsearchSink() API? 
> 
> Build Details
> Flink 1.2.0
> Elastic Search 5.3.0
> 
> 
> Error Message
> 
> 
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the RichSinkFunction is not serializable. The object probably contains or 
> references non serializable fields.
>         at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)
>         at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
>         at 
> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)
> 
> Code Snippet
> 
> ```
>       private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration 
> appConfiguration) throws Exception {
> 
>               String host = 
> appConfiguration.getPipeline().getElasticSearch().getHost();
>               int port = 
> appConfiguration.getPipeline().getElasticSearch().getPort();
>               String cluster = 
> appConfiguration.getPipeline().getElasticSearch().getCluster();
> 
>               Map<String, String> config = new HashMap<>();
>               config.put("bulk.flush.max.actions", "1");
>               config.put("cluster.name", cluster);
> 
>               List<TransportAddress> transports = new ArrayList<>();
>               transports.add(new InetSocketTransportAddress(host, port));
> 
>               return new ElasticsearchSink<>(config, transports, new 
> ResultIndexRequestBuilder(appConfiguration));
>       }
> 
>       public class ResultIndexRequestBuilder implements 
> IndexRequestBuilder<Result>, Serializable {
> 
>               private String index;
>               private String type;
>               //private transient Gson gson = new Gson();
> 
>               public ResultIndexRequestBuilder() {}
> 
>               public ResultIndexRequestBuilder(AppConfiguration 
> appConfiguration) {
>                       index = 
> appConfiguration.getPipeline().getElasticSearch().getIndex();
>                       type = 
> appConfiguration.getPipeline().getElasticSearch().getType();
>               }
> 
>               @Override
>               public IndexRequest createIndexRequest(Result result, 
> RuntimeContext ctx) {
>                       Gson gson = new Gson();
>                       String resultAsJson = gson.toJson(result);
>                       System.out.println(resultAsJson);
>                       Map<String, String> jsonMap = new HashMap<>();
>                       jsonMap.put("data", resultAsJson);
> 
>                       return Requests.indexRequest()
>                                       .index(index)
>                                       .type(type)
>                                       .source(jsonMap);
>               }
> ```
> 
> Regards
> Vijay

Reply via email to