[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691378#comment-16691378
 ] 

ASF GitHub Bot commented on FLINK-7386:
---------------------------------------

zentol closed pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is 
not compatible with Elasticsearch 5.2+ client
URL: https://github.com/apache/flink/pull/4675
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/elasticsearch.md 
b/docs/dev/connectors/elasticsearch.md
index 3fba7f01fea..46d84695f6d 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -53,7 +53,12 @@ of the Elasticsearch installation:
     <tr>
         <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td>
         <td>1.3.0</td>
-        <td>5.x</td>
+        <td>5.2 and previous versions</td>
+    </tr>
+    <tr>
+        <td>flink-connector-elasticsearch5.3{{ site.scala_version_suffix 
}}</td>
+        <td>1.3.0</td>
+        <td>5.3 and later versions</td>
     </tr>
   </tbody>
 </table>
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index ce98dfba1b9..0fac543e446 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -26,6 +26,7 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible 
Elasticsearch Java API calls across different versions.
@@ -36,7 +37,7 @@
  * is allowed, the call bridge will hold reference to the created embedded 
node. Each instance of the sink will hold
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
        /**
         * Creates an Elasticsearch {@link Client}.
@@ -44,7 +45,7 @@
         * @param clientConfig The configuration to use when constructing the 
client.
         * @return The created client.
         */
-       Client createClient(Map<String, String> clientConfig);
+       public abstract Client createClient(Map<String, String> clientConfig);
 
        /**
         * Extracts the cause of failure of a bulk item action.
@@ -52,7 +53,7 @@
         * @param bulkItemResponse the bulk item response to extract cause of 
failure
         * @return the extracted {@link Throwable} from the response ({@code 
null} is the response is successful).
         */
-       @Nullable Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+       public abstract @Nullable Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
        /**
         * Set backoff-related configurations on the provided {@link 
BulkProcessor.Builder}.
@@ -61,13 +62,30 @@
         * @param builder the {@link BulkProcessor.Builder} to configure.
         * @param flushBackoffPolicy user-provided backoff retry settings 
({@code null} if the user disabled backoff retries).
         */
-       void configureBulkProcessorBackoff(
+       public abstract void configureBulkProcessorBackoff(
                BulkProcessor.Builder builder,
                @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
+       /**
+        * Creates an RequestIndexer instance.
+        *
+        * @param bulkProcessor The instance of BulkProcessor
+        * @param flushOnCheckpoint If true, the producer will wait until all 
outstanding action requests have been sent to Elasticsearch.
+        * @param numPendingRequests Number of pending action requests not yet 
acknowledged by Elasticsearch.
+        * @return The created RequestIndexer.
+        */
+       public RequestIndexer createRequestIndex(
+               BulkProcessor bulkProcessor,
+               boolean flushOnCheckpoint,
+               AtomicLong numPendingRequests) {
+               return new BulkProcessorIndexer(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
+       }
+
        /**
         * Perform any necessary state cleanup.
         */
-       void cleanup();
+       public void cleanup() {
+               // nothing to cleanup
+       }
 
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index c49d726454a..1afbad2bbb6 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -152,7 +152,7 @@ public void setDelayMillis(long delayMillis) {
        private boolean flushOnCheckpoint = true;
 
        /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
-       private transient BulkProcessorIndexer requestIndexer;
+       private transient RequestIndexer requestIndexer;
 
        // 
------------------------------------------------------------------------
        //  Internals for the Flink Elasticsearch Sink
@@ -280,7 +280,7 @@ public void disableFlushOnCheckpoint() {
        public void open(Configuration parameters) throws Exception {
                client = callBridge.createClient(userConfig);
                bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
-               requestIndexer = new BulkProcessorIndexer(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
+               requestIndexer = callBridge.createRequestIndex(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 5e5978569cb..bb7d322c7f8 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -510,7 +510,7 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                }
        }
 
-       private static class DummyElasticsearchApiCallBridge implements 
ElasticsearchApiCallBridge {
+       private static class DummyElasticsearchApiCallBridge extends 
ElasticsearchApiCallBridge {
 
                private static final long serialVersionUID = 
-4272760730959041699L;
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 5659ee651e8..e4a35f4627e 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -40,7 +40,7 @@
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = -2632363720584123682L;
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 66b676c4fca..3a54f5c0387 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -42,7 +42,7 @@
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = 2638252694744361079L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch5.3/pom.xml 
b/flink-connectors/flink-connector-elasticsearch5.3/pom.xml
new file mode 100644
index 00000000000..3f0a0ca31db
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5.3/pom.xml
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-elasticsearch5.3_${scala.binary.version}</artifactId>
+       <name>flink-connector-elasticsearch5.3</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <elasticsearch.version>5.3.0</elasticsearch.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- Elasticsearch Java Client has been moved 
to a different module in 5.x -->
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Dependency for Elasticsearch 5.x Java Client -->
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       <artifactId>transport</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <!--
+                       Elasticsearch 5.x uses Log4j2 and no longer detects 
logging implementations, making
+                       Log4j2 a strict dependency. The following is added so 
that the Log4j2 API in
+                       Elasticsearch 5.x is routed to SLF4J. This way, user 
projects can remain flexible
+                       in the logging implementation preferred.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-to-slf4j</artifactId>
+                       <version>2.7</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!--
+                       Including Log4j2 dependencies for tests is required for 
the
+                       embedded Elasticsearch nodes used in tests to run 
correctly.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>2.7</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>2.7</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!--
+                               For the tests, we need to exclude the Log4j2 to 
slf4j adapter dependency
+                               and let Elasticsearch directly use Log4j2, 
otherwise the embedded Elasticsearch node
+                               used in tests will fail to work.
+
+                               In other words, the connector jar is routing 
Elasticsearch 5.x's Log4j2 API's to SLF4J,
+                               but for the test builds, we still stick to 
directly using Log4j2.
+                       -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.12.2</version>
+                               <configuration>
+                                       <classpathDependencyExcludes>
+                                               
<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+                                       </classpathDependencyExcludes>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
new file mode 100644
index 00000000000..e0fd699af55
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+       private final BulkProcessor bulkProcessor;
+       private final boolean flushOnCheckpoint;
+       private final AtomicLong numPendingRequestsRef;
+
+       public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+                                                               boolean 
flushOnCheckpoint,
+                                                               AtomicLong 
numPendingRequests) {
+               this.bulkProcessor = bulkProcessor;
+               this.flushOnCheckpoint = flushOnCheckpoint;
+               this.numPendingRequestsRef = numPendingRequests;
+       }
+
+       @Override
+       public void add(ActionRequest... actionRequests) {
+               for (ActionRequest actionRequest : actionRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add((DocWriteRequest) actionRequest);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java
new file mode 100644
index 00000000000..10c742e9353
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.3 
and later versions.
+ */
+public class Elasticsearch53ApiCallBridge extends ElasticsearchApiCallBridge {
+
+       private static final long serialVersionUID = -5222683870097809633L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch53ApiCallBridge.class);
+
+       /**
+        * User-provided transport addresses.
+        *
+        * <p>We are using {@link InetSocketAddress} because {@link 
TransportAddress} is not serializable in Elasticsearch 5.x.
+        */
+       private final List<InetSocketAddress> transportAddresses;
+
+       Elasticsearch53ApiCallBridge(List<InetSocketAddress> 
transportAddresses) {
+               Preconditions.checkArgument(transportAddresses != null && 
!transportAddresses.isEmpty());
+               this.transportAddresses = transportAddresses;
+       }
+
+       @Override
+       public Client createClient(Map<String, String> clientConfig) {
+               Settings settings = Settings.builder().put(clientConfig)
+                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                       .build();
+
+               TransportClient transportClient = new 
PreBuiltTransportClient(settings);
+               for (TransportAddress transport : 
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+                       transportClient.addTransportAddress(transport);
+               }
+
+               // verify that we actually are connected to a cluster
+               if (transportClient.connectedNodes().isEmpty()) {
+                       throw new RuntimeException("Elasticsearch client is not 
connected to any Elasticsearch nodes!");
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Created Elasticsearch TransportClient with 
connected nodes {}", transportClient.connectedNodes());
+               }
+
+               return transportClient;
+       }
+
+       @Override
+       public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+               if (!bulkItemResponse.isFailed()) {
+                       return null;
+               } else {
+                       return bulkItemResponse.getFailure().getCause();
+               }
+       }
+
+       @Override
+       public void configureBulkProcessorBackoff(
+               BulkProcessor.Builder builder,
+               @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
+
+               BackoffPolicy backoffPolicy;
+               if (flushBackoffPolicy != null) {
+                       switch (flushBackoffPolicy.getBackoffType()) {
+                               case CONSTANT:
+                                       backoffPolicy = 
BackoffPolicy.constantBackoff(
+                                               new 
TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                               
flushBackoffPolicy.getMaxRetryCount());
+                                       break;
+                               case EXPONENTIAL:
+                               default:
+                                       backoffPolicy = 
BackoffPolicy.exponentialBackoff(
+                                               new 
TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                               
flushBackoffPolicy.getMaxRetryCount());
+                       }
+               } else {
+                       backoffPolicy = BackoffPolicy.noBackoff();
+               }
+
+               builder.setBackoffPolicy(backoffPolicy);
+       }
+
+       public RequestIndexer createRequestIndex(
+               BulkProcessor bulkProcessor,
+               boolean flushOnCheckpoint,
+               AtomicLong numPendingRequests) {
+               return new BulkProcessorIndexer(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java
new file mode 100644
index 00000000000..4e7aa99b382
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.transport.TransportClient;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 5.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link TransportClient} to communicate with 
an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * <p>The {@link Map} passed to the constructor is used to create the {@code 
TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io";>Elasticsearch documentation</a>. An 
important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>Note that the Elasticsearch 5.3 and later versions will convert {@link 
ActionRequest ActionRequest} to
+ * {@link DocWriteRequest DocWriteRequest} in {@link BulkProcessorIndexer}.
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is 
used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link TransportClient}.
+        *
+        * @param userConfig The map of user settings that are used when 
constructing the {@link TransportClient} and {@link BulkProcessor}
+        * @param transportAddresses The addresses of Elasticsearch nodes to 
which to connect using a {@link TransportClient}
+        * @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+        */
+       public ElasticsearchSink(
+               Map<String, String> userConfig,
+               List<InetSocketAddress> transportAddresses,
+               ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpFailureHandler());
+       }
+
+       /**
+        * Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link TransportClient}.
+        *
+        * @param userConfig The map of user settings that are used when 
constructing the {@link TransportClient} and {@link BulkProcessor}
+        * @param transportAddresses The addresses of Elasticsearch nodes to 
which to connect using a {@link TransportClient}
+        * @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+        * @param failureHandler This is used to handle failed {@link 
ActionRequest}
+        */
+       public ElasticsearchSink(
+               Map<String, String> userConfig,
+               List<InetSocketAddress> transportAddresses,
+               ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+               ActionRequestFailureHandler failureHandler) {
+
+               super(new Elasticsearch53ApiCallBridge(transportAddresses), 
userConfig, elasticsearchSinkFunction, failureHandler);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 00000000000..5a30ef81d2e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch53.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty3Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 5.3.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+       private Node node;
+
+       @Override
+       public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+               if (node == null) {
+                       Settings settings = Settings.builder()
+                               .put("cluster.name", clusterName)
+                               .put("http.enabled", false)
+                               .put("path.home", tmpDataFolder.getParent())
+                               .put("path.data", 
tmpDataFolder.getAbsolutePath())
+                               .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                               .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                               .build();
+
+                       node = new PluginNode(settings);
+                       node.start();
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (node != null && !node.isClosed()) {
+                       node.close();
+                       node = null;
+               }
+       }
+
+       @Override
+       public Client getClient() {
+               if (node != null && !node.isClosed()) {
+                       return node.client();
+               } else {
+                       return null;
+               }
+       }
+
+       private static class PluginNode extends Node {
+               public PluginNode(Settings settings) {
+                       
super(InternalSettingsPreparer.prepareEnvironment(settings, null), 
Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java
new file mode 100644
index 00000000000..a92e5f1223d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+       @Test
+       public void testTransportClient() throws Exception {
+               runTransportClientTest();
+       }
+
+       @Test
+       public void testNullTransportClient() throws Exception {
+               runNullTransportClientTest();
+       }
+
+       @Test
+       public void testEmptyTransportClient() throws Exception {
+               runEmptyTransportClientTest();
+       }
+
+       @Test
+       public void testTransportClientFails() throws Exception {
+               runTransportClientFailsTest();
+       }
+
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
+                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
+                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
+               return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
+       }
+
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
+               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               return new ElasticsearchSink<>(userConfig, transports, 
elasticsearchSinkFunction);
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java
new file mode 100644
index 00000000000..5169e20b436
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch53.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message #" + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               env.execute("Elasticsearch Sink Example");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index("my-index")
+                       .type("my-type")
+                       .id(element)
+                       .source(json);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..20551848eea
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index ffb572df839..e1f37d519dd 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -45,7 +45,7 @@
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = -5222683870097809633L;
 
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index bc3f82f686c..901117759b6 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -49,6 +49,7 @@ under the License.
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-elasticsearch2</module>
                <module>flink-connector-elasticsearch5</module>
+               <module>flink-connector-elasticsearch5.3</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index a3798456221..f3f12bd7f9c 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -80,6 +80,7 @@ flink-connectors/flink-connector-cassandra,\
 flink-connectors/flink-connector-elasticsearch,\
 flink-connectors/flink-connector-elasticsearch2,\
 flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch5.3,\
 flink-connectors/flink-connector-elasticsearch-base,\
 flink-connectors/flink-connector-filesystem,\
 flink-connectors/flink-connector-kafka-0.8,\


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-7386
>                 URL: https://issues.apache.org/jira/browse/FLINK-7386
>             Project: Flink
>          Issue Type: Improvement
>          Components: ElasticSearch Connector
>            Reporter: Dawid Wysakowicz
>            Assignee: Fang Yong
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to