This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4beb3a  [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest 
binary incompatibility
c4beb3a is described below

commit c4beb3aefa806d1b14ed4d388177935578203bf0
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Sep 12 12:21:34 2018 +0200

    [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary 
incompatibility
    
    This commit fixes the binary incompatibility for UpdateRequests in 
Elasticsearch. This
    is due to a binary compatibility issue between the base module (which is 
compiled
    against a very old ES version and the current Elasticsearch version).
    It lets the API call bridge also provide the RequestIndexer 
version-specific.
    
    This closes #6682.
---
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 14 ++++
 .../elasticsearch/ElasticsearchSinkBase.java       |  4 +-
 .../PreElasticsearch6BulkProcessorIndexer.java     | 84 +++++++++++++++++++++
 .../Elasticsearch6ApiCallBridge.java               | 13 ++++
 .../Elasticsearch6BulkProcessorIndexer.java        | 85 ++++++++++++++++++++++
 .../streaming/tests/Elasticsearch1SinkExample.java | 42 ++++++++---
 .../streaming/tests/Elasticsearch2SinkExample.java | 43 ++++++++---
 .../streaming/tests/Elasticsearch5SinkExample.java | 42 ++++++++---
 .../streaming/tests/Elasticsearch6SinkExample.java | 35 +++++++--
 .../test-scripts/test_streaming_elasticsearch.sh   |  3 +-
 10 files changed, 319 insertions(+), 46 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index f1dcc83..d3b774c 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible 
Elasticsearch Java API calls across different versions.
@@ -80,6 +81,19 @@ public interface ElasticsearchApiCallBridge<C extends 
AutoCloseable> extends Ser
                @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
        /**
+        * Creates a {@link RequestIndexer} that is able to work with {@link 
BulkProcessor} binary compatible.
+        */
+       default RequestIndexer createBulkProcessorIndexer(
+                       BulkProcessor bulkProcessor,
+                       boolean flushOnCheckpoint,
+                       AtomicLong numPendingRequestsRef) {
+               return new PreElasticsearch6BulkProcessorIndexer(
+                       bulkProcessor,
+                       flushOnCheckpoint,
+                       numPendingRequestsRef);
+       }
+
+       /**
         * Perform any necessary state cleanup.
         */
        default void cleanup() {
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7dac06c..4d0c002 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -164,7 +164,7 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
        private boolean flushOnCheckpoint = true;
 
        /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
-       private transient BulkProcessorIndexer requestIndexer;
+       private transient RequestIndexer requestIndexer;
 
        // 
------------------------------------------------------------------------
        //  Internals for the Flink Elasticsearch Sink
@@ -295,7 +295,7 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
        public void open(Configuration parameters) throws Exception {
                client = callBridge.createClient(userConfig);
                bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
-               requestIndexer = new BulkProcessorIndexer(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
+               requestIndexer = 
callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, 
numPendingRequests);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..85f4b9a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk 
request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+ 
versions
+ *             (i.e. the {@link #add(UpdateRequest...)} ). However, this 
module is currently
+ *             compiled against a very old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+       private final BulkProcessor bulkProcessor;
+       private final boolean flushOnCheckpoint;
+       private final AtomicLong numPendingRequestsRef;
+
+       PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor, 
boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+               this.bulkProcessor = checkNotNull(bulkProcessor);
+               this.flushOnCheckpoint = flushOnCheckpoint;
+               this.numPendingRequestsRef = 
checkNotNull(numPendingRequestsRef);
+       }
+
+       @Override
+       public void add(DeleteRequest... deleteRequests) {
+               for (DeleteRequest deleteRequest : deleteRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(deleteRequest);
+               }
+       }
+
+       @Override
+       public void add(IndexRequest... indexRequests) {
+               for (IndexRequest indexRequest : indexRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(indexRequest);
+               }
+       }
+
+       @Override
+       public void add(UpdateRequest... updateRequests) {
+               for (UpdateRequest updateRequest : updateRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(updateRequest);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 03bf9c0..782cbbc 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.elasticsearch6;
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.http.HttpHost;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 
and later versions.
@@ -126,4 +128,15 @@ public class Elasticsearch6ApiCallBridge implements 
ElasticsearchApiCallBridge<R
 
                builder.setBackoffPolicy(backoffPolicy);
        }
+
+       @Override
+       public RequestIndexer createBulkProcessorIndexer(
+                       BulkProcessor bulkProcessor,
+                       boolean flushOnCheckpoint,
+                       AtomicLong numPendingRequestsRef) {
+               return new Elasticsearch6BulkProcessorIndexer(
+                       bulkProcessor,
+                       flushOnCheckpoint,
+                       numPendingRequestsRef);
+       }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..af3c5b1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk 
request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+       private final BulkProcessor bulkProcessor;
+       private final boolean flushOnCheckpoint;
+       private final AtomicLong numPendingRequestsRef;
+
+       Elasticsearch6BulkProcessorIndexer(
+                       BulkProcessor bulkProcessor,
+                       boolean flushOnCheckpoint,
+                       AtomicLong numPendingRequestsRef) {
+               this.bulkProcessor = checkNotNull(bulkProcessor);
+               this.flushOnCheckpoint = flushOnCheckpoint;
+               this.numPendingRequestsRef = 
checkNotNull(numPendingRequestsRef);
+       }
+
+       @Override
+       public void add(DeleteRequest... deleteRequests) {
+               for (DeleteRequest deleteRequest : deleteRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(deleteRequest);
+               }
+       }
+
+       @Override
+       public void add(IndexRequest... indexRequests) {
+               for (IndexRequest indexRequest : indexRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(indexRequest);
+               }
+       }
+
+       @Override
+       public void add(UpdateRequest... updateRequests) {
+               for (UpdateRequest updateRequest : updateRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(updateRequest);
+               }
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index 18fa05a..21c53ed 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -17,16 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -56,11 +58,14 @@ public class Elasticsearch1SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
-                       .map(new MapFunction<Long, String>() {
+               DataStream<Tuple2<String, String>> source = 
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+                       .flatMap(new FlatMapFunction<Long, Tuple2<String, 
String>>() {
                                @Override
-                               public String map(Long value) throws Exception {
-                                       return "message # " + value;
+                               public void flatMap(Long value, 
Collector<Tuple2<String, String>> out) {
+                                       final String key = 
String.valueOf(value);
+                                       final String message = "message #" + 
value;
+                                       out.collect(Tuple2.of(key, message + 
"update #1"));
+                                       out.collect(Tuple2.of(key, message + 
"update #2"));
                                }
                        });
 
@@ -72,12 +77,13 @@ public class Elasticsearch1SinkExample {
                List<TransportAddress> transports = new ArrayList<>();
                transports.add(new 
InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element, 
parameterTool));
-                       }
-               }));
+               source.addSink(new ElasticsearchSink<>(
+                       userConfig,
+                       transports,
+                       (Tuple2<String, String> element, RuntimeContext ctx, 
RequestIndexer indexer) -> {
+                               indexer.add(createIndexRequest(element.f1, 
parameterTool));
+                               indexer.add(createUpdateRequest(element, 
parameterTool));
+                       }));
 
                env.execute("Elasticsearch1.x end to end sink test example");
        }
@@ -92,4 +98,16 @@ public class Elasticsearch1SinkExample {
                        .id(element)
                        .source(json);
        }
+
+       private static UpdateRequest createUpdateRequest(Tuple2<String, String> 
element, ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element.f1);
+
+               return new UpdateRequest(
+                               parameterTool.getRequired("index"),
+                               parameterTool.getRequired("type"),
+                               element.f0)
+                       .doc(json)
+                       .upsert(json);
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index f7532b1a..f8f390e 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -17,15 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.net.InetAddress;
@@ -54,11 +57,14 @@ public class Elasticsearch2SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
-                       .map(new MapFunction<Long, String>() {
+               DataStream<Tuple2<String, String>> source = 
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+                       .flatMap(new FlatMapFunction<Long, Tuple2<String, 
String>>() {
                                @Override
-                               public String map(Long value) throws Exception {
-                                       return "message #" + value;
+                               public void flatMap(Long value, 
Collector<Tuple2<String, String>> out) {
+                                       final String key = 
String.valueOf(value);
+                                       final String message = "message #" + 
value;
+                                       out.collect(Tuple2.of(key, message + 
"update #1"));
+                                       out.collect(Tuple2.of(key, message + 
"update #2"));
                                }
                        });
 
@@ -70,12 +76,13 @@ public class Elasticsearch2SinkExample {
                List<InetSocketAddress> transports = new ArrayList<>();
                transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>(){
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element, 
parameterTool));
-                       }
-               }));
+               source.addSink(new ElasticsearchSink<>(
+                       userConfig,
+                       transports,
+                       (Tuple2<String, String> element, RuntimeContext ctx, 
RequestIndexer indexer) -> {
+                               indexer.add(createIndexRequest(element.f1, 
parameterTool));
+                               indexer.add(createUpdateRequest(element, 
parameterTool));
+                       }));
 
                env.execute("Elasticsearch2.x end to end sink test example");
        }
@@ -90,4 +97,16 @@ public class Elasticsearch2SinkExample {
                        .id(element)
                        .source(json);
        }
+
+       private static UpdateRequest createUpdateRequest(Tuple2<String, String> 
element, ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element.f1);
+
+               return new UpdateRequest(
+                               parameterTool.getRequired("index"),
+                               parameterTool.getRequired("type"),
+                               element.f0)
+                       .doc(json)
+                       .upsert(json);
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 39808f6..893d366 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -17,16 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.net.InetAddress;
@@ -55,11 +57,14 @@ public class Elasticsearch5SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
-                       .map(new MapFunction<Long, String>() {
+               DataStream<Tuple2<String, String>> source = 
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+                       .flatMap(new FlatMapFunction<Long, Tuple2<String, 
String>>() {
                                @Override
-                               public String map(Long value) throws Exception {
-                                       return "message #" + value;
+                               public void flatMap(Long value, 
Collector<Tuple2<String, String>> out) {
+                                       final String key = 
String.valueOf(value);
+                                       final String message = "message #" + 
value;
+                                       out.collect(Tuple2.of(key, message + 
"update #1"));
+                                       out.collect(Tuple2.of(key, message + 
"update #2"));
                                }
                        });
 
@@ -71,12 +76,13 @@ public class Elasticsearch5SinkExample {
                List<InetSocketAddress> transports = new ArrayList<>();
                transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element, 
parameterTool));
-                       }
-               }));
+               source.addSink(new ElasticsearchSink<>(
+                       userConfig,
+                       transports,
+                       (Tuple2<String, String> element, RuntimeContext ctx, 
RequestIndexer indexer) -> {
+                               indexer.add(createIndexRequest(element.f1, 
parameterTool));
+                               indexer.add(createUpdateRequest(element, 
parameterTool));
+                       }));
 
                env.execute("Elasticsearch5.x end to end sink test example");
        }
@@ -91,4 +97,16 @@ public class Elasticsearch5SinkExample {
                        .id(element)
                        .source(json);
        }
+
+       private static UpdateRequest createUpdateRequest(Tuple2<String, String> 
element, ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element.f1);
+
+               return new UpdateRequest(
+                               parameterTool.getRequired("index"),
+                               parameterTool.getRequired("type"),
+                               element.f0)
+                       .doc(json)
+                       .upsert(json);
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index dedcbb2..e813c29 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -17,16 +17,19 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.util.ArrayList;
@@ -53,20 +56,26 @@ public class Elasticsearch6SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
-                       .map(new MapFunction<Long, String>() {
+               DataStream<Tuple2<String, String>> source = 
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+                       .flatMap(new FlatMapFunction<Long, Tuple2<String, 
String>>() {
                                @Override
-                               public String map(Long value) throws Exception {
-                                       return "message #" + value;
+                               public void flatMap(Long value, 
Collector<Tuple2<String, String>> out) {
+                                       final String key = 
String.valueOf(value);
+                                       final String message = "message #" + 
value;
+                                       out.collect(Tuple2.of(key, message + 
"update #1"));
+                                       out.collect(Tuple2.of(key, message + 
"update #2"));
                                }
                        });
 
                List<HttpHost> httpHosts = new ArrayList<>();
                httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 
-               ElasticsearchSink.Builder<String> esSinkBuilder = new 
ElasticsearchSink.Builder<>(
+               ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder 
= new ElasticsearchSink.Builder<>(
                        httpHosts,
-                       (String element, RuntimeContext ctx, RequestIndexer 
indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+                       (Tuple2<String, String> element, RuntimeContext ctx, 
RequestIndexer indexer) -> {
+                               indexer.add(createIndexRequest(element.f1, 
parameterTool));
+                               indexer.add(createUpdateRequest(element, 
parameterTool));
+                       });
 
                // this instructs the sink to emit after every element, 
otherwise they would be buffered
                esSinkBuilder.setBulkFlushMaxActions(1);
@@ -86,4 +95,16 @@ public class Elasticsearch6SinkExample {
                        .id(element)
                        .source(json);
        }
+
+       private static UpdateRequest createUpdateRequest(Tuple2<String, String> 
element, ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element.f1);
+
+               return new UpdateRequest(
+                               parameterTool.getRequired("index"),
+                               parameterTool.getRequired("type"),
+                               element.f0)
+                       .doc(json)
+                       .upsert(json);
+       }
 }
diff --git 
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index c8cd2db..800c4e2 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -45,4 +45,5 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
   --index index \
   --type type
 
-verify_result 20 index
+# 40 index requests and 20 final update requests
+verify_result 60 index

Reply via email to