Hi, I'm afraid you cannot write to different indices using the table API ElasticSearch connector. Now I know why you wanted to go through datastream API.
What you could do to transform from Row to JSON is to use org.apache.flink.formats.json.JsonRowSerializationSchema from flink-json. You just need to get the schema from the final Table of your table API part. Your code could like this: TypeInformation<Row> schema = table.getSchema().toRowType(); SerializationSchema<Row> serializationSchema = new JsonRowSerializationSchema(schema); |public IndexRequest createIndexRequest(Row element) {| byte[] document = serializationSchema.serialize(row) ... return new IndexRequest(index, docType) .source(document, contentType) } Best, Dawid On 10/01/2019 10:34, Ramya Ramamurthy wrote: > Hi, > > Sorry I am a beginner here. I am not really sure how to pack the dynamic > indices here. > the .index(test-ddmmyy) kind of indices here. > I have set the watermark for my kafka table source, but not sure how this > works on the the ElasticSearch Sink. > > Pasted my sample code below: > > tableEnv.connect(new Kafka() > .version("0.11") > .topic(params.getRequired("write-topic")) > .property("bootstrap.servers", "localhost:9092") > .sinkPartitionerRoundRobin()) > .withSchema(new Schema() > .field("sid", Types.STRING()) > .field("ip", Types.STRING()) > .field("family", Types.STRING()) > .field("total_hits", Types.LONG()) > .field("tumbleStart", Types.SQL_TIMESTAMP()) > .field("tumbleEnd", Types.SQL_TIMESTAMP()) > ) > .withFormat(new Json().deriveSchema()) > .inAppendMode() > .registerTableSink("sinkTopic"); > > > new Elasticsearch() > .version("6") > .host("localhost", 9200, "http") > .index("test") ---- How to pass dynamic indices here, based on the > packet received from the table sink. > .documentType("user") > .failureHandlerRetryRejected() > .failureHandlerIgnore() > > .bulkFlushMaxSize("20 mb") > .bulkFlushInterval(100000L) > .bulkFlushBackoffMaxRetries(3) > > .connectionMaxRetryTimeout(3) > .connectionPathPrefix("/v1") > > > Thanks again !! > > > On Thu, Jan 10, 2019 at 2:55 PM miki haiat <miko5...@gmail.com> wrote: > >> You can use flink to manipulate the data by using >> TimeCharacteristic.EventTime[1] and set Watermark. >> Then if you have a lag or other issue the data will be insert to the >> correct Indexes in elastic. >> More specific way to implement it with kafka[2] >> >> >> >> >> 1. >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps >> 2. >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >> >> >> On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <hair...@gmail.com> >> wrote: >> >>> Hi David, >>> >>> thanks for the quick reply. >>> I did try that. I am not sure how to push into rolling indices here. >>> For example, i would maintain daily indices on ES. Based on the event >>> time, i would like to classify the packets to appropriate indices. If >> there >>> was some lag in the source kafka, and i get to receive yesterday's data >>> [say maybe at 00:05 or something], Not sure how to pack the indices here. >>> Is there a way to come around this ?? >>> >>> Regards, >>> ~Ramya. >>> >>> On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <dwysakow...@apache.org >>> >>> wrote: >>> >>>> Hi Ramya, >>>> >>>> Have you tried writing to ES directly from table API? You can check the >>>> ES connector for table API here: >>>> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector >>>> Best, >>>> >>>> Dawid >>>> >>>> On 10/01/2019 09:21, Ramya Ramamurthy wrote: >>>>> Hi, >>>>> >>>>> I am learning to Flink. With Flink 1.7.1, trying to read from Kafka >> and >>>>> insert to ElasticSearch. I have a kafka connector convert the data >> to a >>>>> Flink table. In order to insert into Elasticsearch, I have converted >>> this >>>>> table to a datastream, in order to be able to use the >>> ElasticSearchSink. >>>>> But the Row returned by the streams, have lost the schema. How do i >>>> convert >>>>> this to JSON before calling the Elasticsearch sink connector. Any >> help >>> or >>>>> suggestions would be appreciated. >>>>> >>>>> Thanks. >>>>> >>>>
signature.asc
Description: OpenPGP digital signature