Hi Ramya,

I think the problem is that you access the serializationSchema from the
closure of ElasticsearchSinkFunction. Try creating
ElasticsearchSinkFunction that will get the serializationSchema in ctor.
If this is not the problem could you share the full stack of the error?

Best,

Dawid

On 16/01/2019 11:24, Ramya Ramamurthy wrote:
> Hi Dawid,
>
> Thanks for your response. I was able to make it working.
> But I am able to make it work on my IDE. When i have deployed the same
> to my cluster, i am getting the below error
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function
>
> I am not sure of what is the environment change. Anything you would
> suggest to resolve this issue ??
> Attaching my sample code for the same.
>
> Regards,
> ~Ramya.
>
> On Thu, Jan 10, 2019 at 8:46 PM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi,
>
>     I'm afraid you cannot write to different indices using the table
>     API ElasticSearch connector. Now I know why you wanted to go
>     through datastream API.
>
>     What you could do to transform from Row to JSON is to use
>     org.apache.flink.formats.json.JsonRowSerializationSchema from
>     flink-json. You just need to get the schema from the final Table
>     of your table API part. Your code could like this:
>
>
>     TypeInformation<Row> schema = table.getSchema().toRowType();
>
>     SerializationSchema<Row> serializationSchema = new
>     JsonRowSerializationSchema(schema);
>
>     |public IndexRequest createIndexRequest(Row element) {|
>
>         byte[] document = serializationSchema.serialize(row)
>
>         ...
>
>         return new IndexRequest(index, docType)
>                     .source(document, contentType)
>
>      }
>
>     Best,
>
>     Dawid
>
>     On 10/01/2019 10:34, Ramya Ramamurthy wrote:
>>     Hi,
>>
>>     Sorry I am a beginner here. I am not really sure how to pack the dynamic
>>     indices here.
>>     the .index(test-ddmmyy) kind of indices here.
>>     I have set the watermark for my kafka table source, but not sure how this
>>     works on the the ElasticSearch Sink.
>>
>>     Pasted my sample code below:
>>
>>     tableEnv.connect(new Kafka()
>>           .version("0.11")
>>           .topic(params.getRequired("write-topic"))
>>           .property("bootstrap.servers", "localhost:9092")
>>           .sinkPartitionerRoundRobin())
>>           .withSchema(new Schema()
>>                 .field("sid", Types.STRING())
>>                 .field("ip", Types.STRING())
>>                 .field("family", Types.STRING())
>>                 .field("total_hits", Types.LONG())
>>                 .field("tumbleStart", Types.SQL_TIMESTAMP())
>>                 .field("tumbleEnd", Types.SQL_TIMESTAMP())
>>           )
>>           .withFormat(new Json().deriveSchema())
>>           .inAppendMode()
>>           .registerTableSink("sinkTopic");
>>
>>
>>     new Elasticsearch()
>>     .version("6")
>>     .host("localhost", 9200, "http")
>>     .index("test")  ---- How to pass dynamic indices here, based on the
>>     packet received from the table sink.
>>     .documentType("user")
>>     .failureHandlerRetryRejected()
>>     .failureHandlerIgnore()
>>
>>     .bulkFlushMaxSize("20 mb")
>>     .bulkFlushInterval(100000L)
>>     .bulkFlushBackoffMaxRetries(3)
>>
>>     .connectionMaxRetryTimeout(3)
>>     .connectionPathPrefix("/v1")
>>
>>
>>     Thanks again !!
>>
>>
>>     On Thu, Jan 10, 2019 at 2:55 PM miki haiat <miko5...@gmail.com> 
>> <mailto:miko5...@gmail.com> wrote:
>>
>>>     You can use flink  to manipulate the data by using
>>>     TimeCharacteristic.EventTime[1] and set Watermark.
>>>     Then if you have a lag or other issue  the data will be insert to the
>>>     correct Indexes in elastic.
>>>     More specific way to implement it with kafka[2]
>>>
>>>
>>>
>>>
>>>     1.
>>>
>>>     
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
>>>     2.
>>>
>>>     
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>
>>>
>>>     On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <hair...@gmail.com> 
>>> <mailto:hair...@gmail.com>
>>>     wrote:
>>>
>>>>     Hi David,
>>>>
>>>>     thanks for the quick reply.
>>>>     I did try that. I am not sure how to push into rolling indices here.
>>>>     For example,  i would maintain daily indices on ES. Based on the event
>>>>     time, i would like to classify the packets to appropriate indices. If
>>>     there
>>>>     was some lag in the source kafka, and i get to receive yesterday's data
>>>>     [say maybe at 00:05 or something], Not sure how to pack the indices 
>>>> here.
>>>>     Is there a way to come around this ??
>>>>
>>>>     Regards,
>>>>     ~Ramya.
>>>>
>>>>     On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz 
>>>> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>
>>>>
>>>>     wrote:
>>>>
>>>>>     Hi Ramya,
>>>>>
>>>>>     Have you tried writing to ES directly from table API? You can check 
>>>>> the
>>>>>     ES connector for table API here:
>>>>>
>>>>>
>>>     
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
>>>>>     Best,
>>>>>
>>>>>     Dawid
>>>>>
>>>>>     On 10/01/2019 09:21, Ramya Ramamurthy wrote:
>>>>>>     Hi,
>>>>>>
>>>>>>     I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
>>>     and
>>>>>>     insert to ElasticSearch. I have a kafka connector convert the data
>>>     to a
>>>>>>     Flink table. In order to insert into Elasticsearch, I have converted
>>>>     this
>>>>>>     table to a datastream, in order to be able to use the
>>>>     ElasticSearchSink.
>>>>>>     But the Row returned by the streams, have lost the schema. How do i
>>>>>     convert
>>>>>>     this to JSON before calling the Elasticsearch sink connector. Any
>>>     help
>>>>     or
>>>>>>     suggestions would be appreciated.
>>>>>>
>>>>>>     Thanks.
>>>>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to