Hi,

I'm afraid you cannot write to different indices using the table API
ElasticSearch connector. Now I know why you wanted to go through
datastream API.

What you could do to transform from Row to JSON is to use
org.apache.flink.formats.json.JsonRowSerializationSchema from
flink-json. You just need to get the schema from the final Table of your
table API part. Your code could like this:


TypeInformation<Row> schema = table.getSchema().toRowType();

SerializationSchema<Row> serializationSchema = new
JsonRowSerializationSchema(schema);

|public IndexRequest createIndexRequest(Row element) {|

    byte[] document = serializationSchema.serialize(row)

    ...

    return new IndexRequest(index, docType)
                .source(document, contentType)

 }

Best,

Dawid

On 10/01/2019 10:34, Ramya Ramamurthy wrote:
> Hi,
>
> Sorry I am a beginner here. I am not really sure how to pack the dynamic
> indices here.
> the .index(test-ddmmyy) kind of indices here.
> I have set the watermark for my kafka table source, but not sure how this
> works on the the ElasticSearch Sink.
>
> Pasted my sample code below:
>
> tableEnv.connect(new Kafka()
>       .version("0.11")
>       .topic(params.getRequired("write-topic"))
>       .property("bootstrap.servers", "localhost:9092")
>       .sinkPartitionerRoundRobin())
>       .withSchema(new Schema()
>             .field("sid", Types.STRING())
>             .field("ip", Types.STRING())
>             .field("family", Types.STRING())
>             .field("total_hits", Types.LONG())
>             .field("tumbleStart", Types.SQL_TIMESTAMP())
>             .field("tumbleEnd", Types.SQL_TIMESTAMP())
>       )
>       .withFormat(new Json().deriveSchema())
>       .inAppendMode()
>       .registerTableSink("sinkTopic");
>
>
> new Elasticsearch()
> .version("6")
> .host("localhost", 9200, "http")
> .index("test")  ---- How to pass dynamic indices here, based on the
> packet received from the table sink.
> .documentType("user")
> .failureHandlerRetryRejected()
> .failureHandlerIgnore()
>
> .bulkFlushMaxSize("20 mb")
> .bulkFlushInterval(100000L)
> .bulkFlushBackoffMaxRetries(3)
>
> .connectionMaxRetryTimeout(3)
> .connectionPathPrefix("/v1")
>
>
> Thanks again !!
>
>
> On Thu, Jan 10, 2019 at 2:55 PM miki haiat <miko5...@gmail.com> wrote:
>
>> You can use flink  to manipulate the data by using
>> TimeCharacteristic.EventTime[1] and set Watermark.
>> Then if you have a lag or other issue  the data will be insert to the
>> correct Indexes in elastic.
>> More specific way to implement it with kafka[2]
>>
>>
>>
>>
>> 1.
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
>> 2.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>
>>
>> On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <hair...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> thanks for the quick reply.
>>> I did try that. I am not sure how to push into rolling indices here.
>>> For example,  i would maintain daily indices on ES. Based on the event
>>> time, i would like to classify the packets to appropriate indices. If
>> there
>>> was some lag in the source kafka, and i get to receive yesterday's data
>>> [say maybe at 00:05 or something], Not sure how to pack the indices here.
>>> Is there a way to come around this ??
>>>
>>> Regards,
>>> ~Ramya.
>>>
>>> On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <dwysakow...@apache.org
>>>
>>> wrote:
>>>
>>>> Hi Ramya,
>>>>
>>>> Have you tried writing to ES directly from table API? You can check the
>>>> ES connector for table API here:
>>>>
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 10/01/2019 09:21, Ramya Ramamurthy wrote:
>>>>> Hi,
>>>>>
>>>>> I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
>> and
>>>>> insert to ElasticSearch. I have a kafka connector convert the data
>> to a
>>>>> Flink table. In order to insert into Elasticsearch, I have converted
>>> this
>>>>> table to a datastream, in order to be able to use the
>>> ElasticSearchSink.
>>>>> But the Row returned by the streams, have lost the schema. How do i
>>>> convert
>>>>> this to JSON before calling the Elasticsearch sink connector. Any
>> help
>>> or
>>>>> suggestions would be appreciated.
>>>>>
>>>>> Thanks.
>>>>>
>>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to