The sink if for Streaming API, it looks like you are using SQL and tables.
So you can use the connector to output the table result to Elastic. Unless
you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <fernando.cas...@leidos.com>
wrote:

> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>

Reply via email to