Hi David, thanks for the quick reply. I did try that. I am not sure how to push into rolling indices here. For example, i would maintain daily indices on ES. Based on the event time, i would like to classify the packets to appropriate indices. If there was some lag in the source kafka, and i get to receive yesterday's data [say maybe at 00:05 or something], Not sure how to pack the indices here. Is there a way to come around this ??
Regards, ~Ramya. On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Ramya, > > Have you tried writing to ES directly from table API? You can check the > ES connector for table API here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector > > Best, > > Dawid > > On 10/01/2019 09:21, Ramya Ramamurthy wrote: > > Hi, > > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and > > insert to ElasticSearch. I have a kafka connector convert the data to a > > Flink table. In order to insert into Elasticsearch, I have converted this > > table to a datastream, in order to be able to use the ElasticSearchSink. > > But the Row returned by the streams, have lost the schema. How do i > convert > > this to JSON before calling the Elasticsearch sink connector. Any help or > > suggestions would be appreciated. > > > > Thanks. > > > >