[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205051308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 
6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge implements 
ElasticsearchApiCallBridge {
+
+   private static final long serialVersionUID = -5222683870097809633L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+   /**
+* User-provided HTTP Host.
+*/
+   private final List httpHosts;
+
+   Elasticsearch6ApiCallBridge(List httpHosts) {
+   Preconditions.checkArgument(httpHosts != null && 
!httpHosts.isEmpty());
+   this.httpHosts = httpHosts;
+   }
+
+   @Override
+   public RestHighLevelClient createClient(Map 
clientConfig) {
+   RestHighLevelClient rhlClient =
--- End diff --

might have been good to support:
- context path / path prefix in addition to host
- login/password for Elasticsearch instances protected

that's ok to not do it as soon as the user can by subclassing. Maybe to 
make it easier to subclass there should be two methods.

keep the public `createClient` one that returns the `RHLClient`. And add a 
protected method `createRestClientBuilder` which return the 
`RestClientBuilder`. This way one can just redefine the protected method and 
let the pubic one handle the actual `RHLClient` instanciation from the 
`RestClientBuilder` created by the protected method. 


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037835
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Removed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037795
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

Actually, we decided to move examples out of the test code. Removing this.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205031956
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

You're right, removed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205029480
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

Added a loop to wait until the Elasticsearch node is really running.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204756208
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
/**
-* Creates an Elasticsearch {@link Client}.
+* Creates an Elasticsearch client implementing {@link AutoCloseable}.
 *
 * @param clientConfig The configuration to use when constructing the 
client.
 * @return The created client.
 */
-   Client createClient(Map clientConfig);
+   public abstract AutoCloseable createClient(Map 
clientConfig);
+
+   public abstract BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener);
--- End diff --

No docs here?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204999073
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

The test is not runnable on my machine.

```
Elasticsearch node is not running.
grep: /Users/twalthr/flink/flink/build-target/log/*.out: No such file or 
directory

[FAIL] './test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
 failed after 0 minutes and 18 seconds! Test exited with exit code 1
```

The tests exits before elastic search has actually started. Also killing 
does not work. An Elasticsearch process is still running afterwards.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991878
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

Are these values still valid? I thought we are not relying on Netty anymore 
with the rest client?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758828
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -176,7 +175,7 @@ public void setDelayMillis(long delayMillis) {
private AtomicLong numPendingRequests = new AtomicLong(0);
 
/** Elasticsearch client created using the call bridge. */
-   private transient Client client;
+   private transient AutoCloseable client;
--- End diff --

Same here. Why not parameterize the class and be type save?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758434
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -64,13 +65,15 @@
 * @param builder the {@link BulkProcessor.Builder} to configure.
 * @param flushBackoffPolicy user-provided backoff retry settings 
({@code null} if the user disabled backoff retries).
 */
-   void configureBulkProcessorBackoff(
+   public abstract void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
/**
 * Perform any necessary state cleanup.
 */
-   void cleanup();
+   public void cleanup() {
--- End diff --

Use Java 8 defaults and let this class stay an interface?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
+   final String index = "transport-client-test-index";
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStreamSource> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+   Map userConfig = new HashMap<>();
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+   new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+   env.execute("Elasticsearch RestHighLevelClient Test");
+
+   // verify the results
+   Client client = embeddedNodeEnv.getClient();
+   SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+   client.close();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is {@code null}.
+*/
+   public void runNullTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig, null, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+   } catch (IllegalArgumentException expectedException) {
+   // test passes
+   return;
+   }
+
+   fail();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is empty.
+*/
+   public void runEmptyTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig,
+   Collect

[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204994308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Still necessary?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204990734
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
--- End diff --

Add `@PublicEvolving`


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204775927
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
--- End diff --

Update docs here.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204992515
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
--- End diff --

There are no `@Test` annotations to run tests. We should also rename the 
method in `testXXX` as we usually do it. The super class method names should be 
updated as `runTransportClientTest` is not correct anymore.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991006
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
+*/
+   public ElasticsearchSink(Map userConfig, List 
httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) {
+
+   this(userConfig, httpHosts, elasticsearchSinkFunction, new 
NoOpFailureHandler());
+   }
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param failureHandler This is used to handle failed {@link 
ActionRequest}
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
--- End diff --

Fix two invalid Javadocs.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993809
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

We should add tests for our examples.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991076
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
--- End diff --

Remove unused import.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204757871
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
--- End diff --

Parameterize the class instead of using `AutoClosable` as a synonym for the 
a client that implements this interface. This avoids manual casting in 
subclasses.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204750192
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, 
transportAddresses, new Elasticsea
 }
 }));{% endhighlight %}
 
+
+{% highlight java %}
+DataStream input = ...;
+
+List httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new 
ElasticsearchSinkFunction() {
--- End diff --

Add an example for the user config as well to be in sync with the examples 
of other versions? Because the following paragraph mentions: 

> Especially important is the `cluster.name` parameter

Btw could you also add imports to your examples. I just started to do this 
with my code examples to make it easier for people to find the used classes 
(see 
[here](https://ci.apache.org/projects/flink/flink-docs-master/dev/java_lambdas.html))


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204752713
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, 
transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new 
ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+val json = new java.util.HashMap[String, String]
+json.put("data", element)
+
+return Requests.indexRequest()
+.index("my-index")
+.type("my-type")
+.source(json)
+  }
+}))
+{% endhighlight %}
+
 
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used 
to configure the `ElasticsearchSink`.
--- End diff --

Remove "is used to"?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-23 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6391

[FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x PR

## What is the purpose of the change

This PR adds the Elasticsearch 6.x PR, as well as an end-to-end test for it.

## Brief change log

- Cherry-picked @cjolif 's Elasticsearch 5.3+ compatibility and 6.x 
implementation changes
- Add end to end test for ES 6.x

## Verifying this change

Run the new end-to-end test for ES 6.x.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9885

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6391






---