[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855662#comment-15855662 ]
ASF GitHub Bot commented on FLINK-4988: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r99783050 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,291 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId> - <version>{{site.version }}</version> -</dependency> -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Maven Dependency</th> + <th class="text-left">Supported since</th> + <th class="text-left">Elasticsearch version</th> + </tr> + </thead> + <tbody> + <tr> + <td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td> + <td>1.0.0</td> + <td>1.x</td> + </tr> + <tr> + <td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td> + <td>1.0.0</td> + <td>2.x</td> + </tr> + <tr> + <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td> + <td>1.2.0</td> + <td>5.x</td> + </tr> + </tbody> +</table> Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking.html) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information +about how to package the program with the libraries for cluster execution. #### Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. #### Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: <div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> +<div data-lang="java, Elasticsearch 1.x" markdown="1"> {% highlight java %} DataStream<String> input = ...; -Map<String, String> config = Maps.newHashMap(); +Map<String, String> config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); +List<TransportAddress> transportAddresses = new ArrayList<String>(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { + public IndexRequest createIndexRequest(String element) { + Map<String, String> json = new HashMap<>(); + json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } })); {% endhighlight %} </div> -<div data-lang="scala" markdown="1"> +<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1"> +{% highlight java %} +DataStream<String> input = ...; + +Map<String, String> config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") --- End diff -- ; missing > Elasticsearch 5.x support > ------------------------- > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors > Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)