[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15856129#comment-15856129
 ] 

ASF GitHub Bot commented on FLINK-4988:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r99837460
  
    --- Diff: docs/dev/connectors/elasticsearch.md ---
    @@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
     under the License.
     -->
     
    -This connector provides a Sink that can write to an
    -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
    -following dependency to your project:
    -
    -{% highlight xml %}
    -<dependency>
    -  <groupId>org.apache.flink</groupId>
    -  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix 
}}</artifactId>
    -  <version>{{site.version }}</version>
    -</dependency>
    -{% endhighlight %}
    +This connector provides sinks that can request document actions to an
    +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
    +of the following dependencies to your project, depending on the version
    +of the Elasticsearch installation:
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left">Maven Dependency</th>
    +      <th class="text-left">Supported since</th>
    +      <th class="text-left">Elasticsearch version</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +        <td>flink-connector-elasticsearch{{ site.scala_version_suffix 
}}</td>
    +        <td>1.0.0</td>
    +        <td>1.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}</td>
    +        <td>1.0.0</td>
    +        <td>2.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}</td>
    +        <td>1.2.0</td>
    +        <td>5.x</td>
    +    </tr>
    +  </tbody>
    +</table>
     
     Note that the streaming connectors are currently not part of the binary
    -distribution. See
    -[here]({{site.baseurl}}/dev/linking.html)
    -for information about how to package the program with the libraries for
    -cluster execution.
    +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
    +about how to package the program with the libraries for cluster execution.
     
     #### Installing Elasticsearch
     
     Instructions for setting up an Elasticsearch cluster can be found
     
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
     Make sure to set and remember a cluster name. This must be set when
    -creating a Sink for writing to your cluster
    +creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
     
     #### Elasticsearch Sink
    -The connector provides a Sink that can send data to an Elasticsearch Index.
    -
    -The sink can use two different methods for communicating with 
Elasticsearch:
    -
    -1. An embedded Node
    -2. The TransportClient
     
    -See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    -for information about the differences between the two modes.
    +The `ElasticsearchSink` uses a `TransportClient` to communicate with an
    +Elasticsearch cluster.
     
    -This code shows how to create a sink that uses an embedded Node for
    -communication:
    +The example below shows how to configure and create a sink:
     
     <div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    +<div data-lang="java, Elasticsearch 1.x" markdown="1">
     {% highlight java %}
     DataStream<String> input = ...;
     
    -Map<String, String> config = Maps.newHashMap();
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
     // This instructs the sink to emit after every element, otherwise they 
would be buffered
     config.put("bulk.flush.max.actions", "1");
    -config.put("cluster.name", "my-cluster-name");
     
    -input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder<String>() {
    -    @Override
    -    public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
    -        Map<String, Object> json = new HashMap<>();
    -        json.put("data", element);
    +List<TransportAddress> transportAddresses = new ArrayList<String>();
    +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
    +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
     
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
             return Requests.indexRequest()
                     .index("my-index")
                     .type("my-type")
                     .source(json);
         }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
     }));
     {% endhighlight %}
     </div>
    -<div data-lang="scala" markdown="1">
    +<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +
    +List<InetSocketAddress> transportAddresses = new ArrayList<>();
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    +
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
    +        return Requests.indexRequest()
    +                .index("my-index")
    +                .type("my-type")
    +                .source(json);
    +    }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
    +}));{% endhighlight %}
    +</div>
    +<div data-lang="scala, Elasticsearch 1.x" markdown="1">
     {% highlight scala %}
     val input: DataStream[String] = ...
     
    -val config = new util.HashMap[String, String]
    +val config = new java.util.HashMap[String, String]
    +config.put("cluster.name", "my-cluster-name")
    --- End diff --
    
    This is scala code, so the semicolon is ignored ;-) .


> Elasticsearch 5.x support
> -------------------------
>
>                 Key: FLINK-4988
>                 URL: https://issues.apache.org/jira/browse/FLINK-4988
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>            Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to