Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm. // sink the filtered data to ElasticSearch clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] { def createIndexRequest(element: ActivityInfo): IndexRequest = { val json = new java.util.HashMap[String, ActivityInfo] json.put("data", element) Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json) } override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { requestIndexer.add(createIndexRequest(activityInfo)) } })) Thanks mingleizhang