[ 
https://issues.apache.org/jira/browse/BEAM-5107?focusedWorklogId=141645&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141645
 ]

ASF GitHub Bot logged work on BEAM-5107:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Sep/18 07:13
            Start Date: 06/Sep/18 07:13
    Worklog Time Spent: 10m 
      Work Description: echauchot closed pull request #6211: [BEAM-5107] 
Support ES-6.x for ElasticsearchIO
URL: https://github.com/apache/beam/pull/6211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 258121874d3..9eccb7f0982 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -17,17 +17,10 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.junit.Assert.assertEquals;
 
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
 import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.elasticsearch.client.RestClient;
 import org.junit.AfterClass;
@@ -93,26 +86,7 @@ public static void afterClass() throws Exception {
 
   @Test
   public void testSplitsVolume() throws Exception {
-    Read read = 
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
-    BoundedElasticsearchSource initialSource =
-        new BoundedElasticsearchSource(read, null, null, null);
-    // desiredBundleSize is ignored because in ES 2.x there is no way to split 
shards. So we get
-    // as many bundles as ES shards and bundle size is shard size
-    long desiredBundleSizeBytes = 0;
-    List<? extends BoundedSource<String>> splits =
-        initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
-    // this is the number of ES shards
-    // (By default, each index in Elasticsearch is allocated 5 primary shards)
-    long expectedNumSplits = 5;
-    assertEquals(expectedNumSplits, splits.size());
-    int nonEmptySplits = 0;
-    for (BoundedSource<String> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertEquals(expectedNumSplits, nonEmptySplits);
+    elasticsearchIOTestCommon.testSplit(0);
   }
 
   @Test
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 9920dde53ba..52609e54d40 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -17,26 +17,13 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.settings.Settings;
@@ -173,32 +160,7 @@ public void testWriteWithMaxBatchSizeBytes() throws 
Exception {
 
   @Test
   public void testSplit() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(
-        connectionConfiguration, NUM_DOCS_UTESTS, restClient);
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Read read = 
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
-    BoundedElasticsearchSource initialSource =
-        new BoundedElasticsearchSource(read, null, null, null);
-    //desiredBundleSize is ignored because in ES 2.x there is no way to split 
shards. So we get
-    // as many bundles as ES shards and bundle size is shard size
-    int desiredBundleSizeBytes = 0;
-    List<? extends BoundedSource<String>> splits =
-        initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
-    //this is the number of ES shards
-    // (By default, each index in Elasticsearch is allocated 5 primary shards)
-    int expectedNumSources = 5;
-    assertEquals("Wrong number of splits", expectedNumSources, splits.size());
-    int emptySplits = 0;
-    for (BoundedSource<String> subSource : splits) {
-      if (readFromSource(subSource, options).isEmpty()) {
-        emptySplits += 1;
-      }
-    }
-    assertThat(
-        "There are too many empty splits, parallelism is sub-optimal",
-        emptySplits,
-        lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
+    elasticsearchIOTestCommon.testSplit(0);
   }
 
   @Test
@@ -216,7 +178,7 @@ public void testWriteWithIndexFn() throws Exception {
   @Test
   public void testWriteWithTypeFn() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testWriteWithTypeFn();
+    elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
   }
 
   @Test
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
index 48f6064cd07..715cb6d42a2 100755
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
@@ -18,7 +18,7 @@
 #
 
################################################################################
 
-#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and 
compatible Logstash and Kibana versions,
+#Create an ELK (Elasticsearch Logstash Kibana) container for ES v5.4.3 and 
compatible Logstash and Kibana versions,
 #bind then on host ports, allow shell access to container and mount current 
directory on /home/$USER inside the container
 
-docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 
-it -v $(pwd):/home/$USER/ --name elk-2.4  sebp/elk:es240_l240_k460
+docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 
-it -v $(pwd):/home/$USER/ --name elk-5.4.3  sebp/elk:543
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index b38b1696141..c3eb9d11ad4 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -17,17 +17,10 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.junit.Assert.assertEquals;
 
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
 import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.elasticsearch.client.RestClient;
 import org.junit.AfterClass;
@@ -93,24 +86,7 @@ public static void afterClass() throws Exception {
 
   @Test
   public void testSplitsVolume() throws Exception {
-    Read read = 
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
-    BoundedElasticsearchSource initialSource =
-        new BoundedElasticsearchSource(read, null, null, null);
-    int desiredBundleSizeBytes = 10000;
-    List<? extends BoundedSource<String>> splits =
-        initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
-    long indexSize = 
BoundedElasticsearchSource.estimateIndexSize(readConnectionConfiguration);
-    float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
-    int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
-    assertEquals(expectedNumSources, splits.size());
-    int nonEmptySplits = 0;
-    for (BoundedSource<String> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertEquals("Wrong number of empty splits", expectedNumSources, 
nonEmptySplits);
+    elasticsearchIOTestCommon.testSplit(10_000);
   }
 
   @Test
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index b453b9f4740..2414f8f44da 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -17,15 +17,9 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.lessThan;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import java.io.IOException;
@@ -33,11 +27,6 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.plugins.Plugin;
@@ -160,33 +149,10 @@ public void testWriteWithMaxBatchSizeBytes() throws 
Exception {
 
   @Test
   public void testSplit() throws Exception {
-    //need to create the index using the helper method (not create it at first 
insertion)
+    // need to create the index using the helper method (not create it at 
first insertion)
     // for the indexSettings() to be run
     createIndex(getEsIndex());
-    ElasticSearchIOTestUtils.insertTestDocuments(
-        connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Read read = 
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
-    BoundedElasticsearchSource initialSource =
-        new BoundedElasticsearchSource(read, null, null, null);
-    int desiredBundleSizeBytes = 2000;
-    List<? extends BoundedSource<String>> splits =
-        initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
-    long indexSize = 
BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
-    float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
-    int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
-    assertEquals("Wrong number of splits", expectedNumSources, splits.size());
-    int emptySplits = 0;
-    for (BoundedSource<String> subSource : splits) {
-      if (readFromSource(subSource, options).isEmpty()) {
-        emptySplits += 1;
-      }
-    }
-    assertThat(
-        "There are too many empty splits, parallelism is sub-optimal",
-        emptySplits,
-        lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
+    elasticsearchIOTestCommon.testSplit(2_000);
   }
 
   @Test
@@ -204,7 +170,7 @@ public void testWriteWithIndexFn() throws Exception {
   @Test
   public void testWriteWithTypeFn() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testWriteWithTypeFn();
+    elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
   }
 
   @Test
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle
new file mode 100644
index 00000000000..9675e4bf458
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 6.x"
+ext.summary = "Tests of ElasticsearchIO on Elasticsearch 6.x"
+
+test {
+  // needed for ESIntegTestCase
+  systemProperty "tests.security.manager", "false"
+}
+
+def jna_version = "4.1.0"
+def log4j_version = "2.6.2"
+def elastic_search_version = "6.4.0"
+
+configurations.all {
+  resolutionStrategy {
+    // Make sure the log4j versions for api and core match instead of taking 
the default
+    // Gradle rule of using the latest.
+    force "org.apache.logging.log4j:log4j-api:$log4j_version"
+    force "org.apache.logging.log4j:log4j-core:$log4j_version"
+  }
+}
+
+dependencies {
+  testCompile project(path: ":beam-sdks-java-io-elasticsearch-tests-common", 
configuration: "shadowTest")
+  testCompile "org.elasticsearch.test:framework:$elastic_search_version"
+  testCompile 
"org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
+  testCompile 
"com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2"
+  testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"
+
+  testCompile project(path: ":beam-sdks-java-core", configuration: "shadow")
+  testCompile project(path: ":beam-sdks-java-io-elasticsearch", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
+  testCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
+  testCompile "org.apache.logging.log4j:log4j-api:$log4j_version"
+  testCompile library.java.slf4j_api
+  testCompile "net.java.dev.jna:jna:$jna_version"
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  testCompile library.java.slf4j_jdk14
+  testCompile library.java.commons_io_1x
+  testCompile library.java.junit
+  testCompile 
"org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
+}
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh
new file mode 100755
index 00000000000..f29ad7060b5
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#Create an ELK (Elasticsearch Logstash Kibana) container for ES v6.4.0 and 
compatible Logstash and Kibana versions,
+#bind then on host ports, allow shell access to container and mount current 
directory on /home/$USER inside the container
+
+docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 
-it -v $(pwd):/home/$USER/ --name elk-6.4.0  sebp/elk:640
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
new file mode 100644
index 00000000000..6621d7be89e
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+
+import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * A test of {@link ElasticsearchIO} on an independent Elasticsearch v5.x 
instance.
+ *
+ * <p>This test requires a running instance of Elasticsearch, and the test 
dataset must exist in the
+ * database. See {@link ElasticsearchIOITCommon} for instructions to achieve 
this.
+ *
+ * <p>You can run this test by doing the following from the beam parent module 
directory with the
+ * correct server IP:
+ *
+ * <pre>
+ *  ./gradlew integrationTest -p 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-6
+ *  -DintegrationTestPipelineOptions='[
+ *  "--elasticsearchServer=1.2.3.4",
+ *  "--elasticsearchHttpPort=9200"]'
+ *  --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT
+ *  -DintegrationTestRunner=direct
+ * </pre>
+ *
+ * <p>It is likely that you will need to configure 
<code>thread_pool.bulk.queue_size: 250</code> (or
+ * higher) in the backend Elasticsearch server for this test to run.
+ */
+public class ElasticsearchIOIT {
+  private static RestClient restClient;
+  private static ElasticsearchPipelineOptions options;
+  private static ConnectionConfiguration readConnectionConfiguration;
+  private static ConnectionConfiguration writeConnectionConfiguration;
+  private static ConnectionConfiguration updateConnectionConfiguration;
+  private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    PipelineOptionsFactory.register(ElasticsearchPipelineOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(ElasticsearchPipelineOptions.class);
+    readConnectionConfiguration =
+        ElasticsearchIOITCommon.getConnectionConfiguration(
+            options, ElasticsearchIOITCommon.IndexMode.READ);
+    writeConnectionConfiguration =
+        ElasticsearchIOITCommon.getConnectionConfiguration(
+            options, ElasticsearchIOITCommon.IndexMode.WRITE);
+    updateConnectionConfiguration =
+        ElasticsearchIOITCommon.getConnectionConfiguration(
+            options, ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL);
+    restClient = readConnectionConfiguration.createClient();
+    elasticsearchIOTestCommon =
+        new ElasticsearchIOTestCommon(readConnectionConfiguration, restClient, 
true);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration, 
restClient);
+    ElasticSearchIOTestUtils.deleteIndex(updateConnectionConfiguration, 
restClient);
+    restClient.close();
+  }
+
+  @Test
+  public void testSplitsVolume() throws Exception {
+    elasticsearchIOTestCommon.testSplit(10_000);
+  }
+
+  @Test
+  public void testReadVolume() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testRead();
+  }
+
+  @Test
+  public void testWriteVolume() throws Exception {
+    // cannot share elasticsearchIOTestCommon because tests run in parallel.
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
+        new ElasticsearchIOTestCommon(writeConnectionConfiguration, 
restClient, true);
+    elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+    elasticsearchIOTestCommonWrite.testWrite();
+  }
+
+  @Test
+  public void testSizesVolume() throws Exception {
+    elasticsearchIOTestCommon.testSizes();
+  }
+
+  /**
+   * This test verifies volume loading of Elasticsearch using explicit 
document IDs and routed to an
+   * index named the same as the scientist, and type which is based on the 
modulo 2 of the scientist
+   * name. The goal of this IT is to help observe and verify that the overhead 
of adding the
+   * functions to parse the document and extract the ID is acceptable.
+   */
+  @Test
+  public void testWriteWithFullAddressingVolume() throws Exception {
+    // cannot share elasticsearchIOTestCommon because tests run in parallel.
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
+        new ElasticsearchIOTestCommon(writeConnectionConfiguration, 
restClient, true);
+    elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+    elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
+  }
+
+  /**
+   * This test verifies volume partial updates of Elasticsearch. The test 
dataset index is cloned
+   * and then a new field is added to each document using a partial update. 
The test then asserts
+   * the updates where appied.
+   */
+  @Test
+  public void testWritePartialUpdate() throws Exception {
+    ElasticSearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    // cannot share elasticsearchIOTestCommon because tests run in parallel.
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, 
restClient, true);
+    elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
+    elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
+  }
+}
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 00000000000..12b2c5af3c7
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/*
+Cannot use @RunWith(JUnit4.class) with ESIntegTestCase
+Cannot have @BeforeClass @AfterClass with ESIntegTestCase
+*/
+
+/** Tests for {@link ElasticsearchIO} version 6. */
+@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
+public class ElasticsearchIOTest extends ESIntegTestCase implements 
Serializable {
+
+  private ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+  private ConnectionConfiguration connectionConfiguration;
+
+  private String[] fillAddresses() {
+    ArrayList<String> result = new ArrayList<>();
+    for (InetSocketAddress address : cluster().httpAddresses()) {
+      result.add(String.format("http://%s:%s";, address.getHostString(), 
address.getPort()));
+    }
+    return result.toArray(new String[result.size()]);
+  }
+
+  @Override
+  protected Settings nodeSettings(int nodeOrdinal) {
+    System.setProperty("es.set.netty.runtime.available.processors", "false");
+    return Settings.builder()
+        .put(super.nodeSettings(nodeOrdinal))
+        .put("http.enabled", "true")
+        // had problems with some jdk, embedded ES was too slow for bulk 
insertion,
+        // and queue of 50 was full. No pb with real ES instance (cf testWrite 
integration test)
+        .put("thread_pool.bulk.queue_size", 400)
+        .build();
+  }
+
+  @Override
+  public Settings indexSettings() {
+    return Settings.builder()
+        .put(super.indexSettings())
+        //useful to have updated sizes for getEstimatedSize
+        .put("index.store.stats_refresh_interval", 0)
+        .build();
+  }
+
+  @Override
+  protected Collection<Class<? extends Plugin>> nodePlugins() {
+    ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
+    plugins.add(Netty4Plugin.class);
+    return plugins;
+  }
+
+  @Before
+  public void setup() {
+    if (connectionConfiguration == null) {
+      connectionConfiguration =
+          ConnectionConfiguration.create(fillAddresses(), getEsIndex(), 
ES_TYPE);
+      elasticsearchIOTestCommon =
+          new ElasticsearchIOTestCommon(connectionConfiguration, 
getRestClient(), false);
+    }
+  }
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testSizes() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(getEsIndex());
+    elasticsearchIOTestCommon.testSizes();
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(getEsIndex());
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testRead();
+  }
+
+  @Test
+  public void testReadWithQuery() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(getEsIndex());
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithQuery();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWrite();
+  }
+
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWriteWithErrors() throws Exception {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.testWriteWithErrors();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSize() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSizeBytes() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
+  }
+
+  @Test
+  public void testSplit() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(getEsIndex());
+    elasticsearchIOTestCommon.testSplit(2_000);
+  }
+
+  @Test
+  public void testWriteWithIdFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIdFn();
+  }
+
+  @Test
+  public void testWriteWithIndexFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIndexFn();
+  }
+
+  @Test
+  public void testWriteFullAddressing() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithFullAddressing();
+  }
+
+  @Test
+  public void testWritePartialUpdate() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWritePartialUpdate();
+  }
+
+  @Test
+  public void testReadWithMetadata() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithMetadata();
+  }
+
+  @Test
+  public void testDefaultRetryPredicate() throws IOException {
+    elasticsearchIOTestCommon.testDefaultRetryPredicate(getRestClient());
+  }
+
+  @Test
+  public void testWriteRetry() throws Throwable {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetry();
+  }
+}
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java
new file mode 100644
index 00000000000..be74371dc2c
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.elasticsearch.bootstrap;
+
+import java.util.function.Consumer;
+
+/**
+ * We need a real Elasticsearch instance to properly test the IO (split, slice 
API, scroll API,
+ * ...). Starting at ES 5, to have Elasticsearch embedded, we are forced to 
use Elasticsearch test
+ * framework. But this framework checks for class duplicates in classpath and 
it cannot be
+ * deactivated. When the class duplication come from a dependency, then it 
cannot be avoided.
+ * Elasticsearch community does not provide a way of deactivating the jar hell 
test, so skip it by
+ * making this hack. In this case duplicate class is class:
+ * org.apache.maven.surefire.report.SafeThrowable jar1: surefire-api-2.20.jar 
jar2:
+ * surefire-junit47-2.20.jar
+ */
+class JarHell {
+
+  @SuppressWarnings("EmptyMethod")
+  public static void checkJarHell(Consumer<String> output) {}
+}
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle
index bf4fac8d5ac..941c526296d 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle
@@ -45,5 +45,5 @@ dependencies {
   testCompile library.java.slf4j_jdk14
   testCompile library.java.commons_io_1x
   testCompile library.java.junit
-  testCompile "org.elasticsearch.client:elasticsearch-rest-client:5.6.3"
+  testCompile "org.elasticsearch.client:elasticsearch-rest-client:6.4.0"
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 5a8ad788226..345805e22a0 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -27,7 +27,9 @@
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -43,11 +45,13 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
 import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
 import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
@@ -120,6 +124,43 @@ void setExpectedException(ExpectedException 
expectedException) {
     this.expectedException = expectedException;
   }
 
+  void testSplit(final int desiredBundleSizeBytes) throws Exception {
+    if (!useAsITests) {
+      ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, 
numDocs, restClient);
+    }
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Read read = 
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+    BoundedElasticsearchSource initialSource =
+        new BoundedElasticsearchSource(read, null, null, null);
+    List<? extends BoundedSource<String>> splits =
+        initialSource.split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
+    long indexSize = 
BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
+
+    int expectedNumSources;
+    if (desiredBundleSizeBytes == 0) {
+      // desiredBundleSize is ignored because in ES 2.x there is no way to 
split shards.
+      // 5 is the number of ES shards
+      // (By default, each index in Elasticsearch is allocated 5 primary 
shards)
+      expectedNumSources = 5;
+    } else {
+      float expectedNumSourcesFloat = (float) indexSize / 
desiredBundleSizeBytes;
+      expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
+    }
+    assertEquals("Wrong number of splits", expectedNumSources, splits.size());
+
+    int emptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).isEmpty()) {
+        emptySplits += 1;
+      }
+    }
+    assertThat(
+        "There are too many empty splits, parallelism is sub-optimal",
+        emptySplits,
+        lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
+  }
+
   void testSizes() throws Exception {
     if (!useAsITests) {
       ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, 
numDocs, restClient);
@@ -162,8 +203,7 @@ void testReadWithQuery() throws Exception {
             + "  \"query\": {\n"
             + "  \"match\" : {\n"
             + "    \"scientist\" : {\n"
-            + "      \"query\" : \"Einstein\",\n"
-            + "      \"type\" : \"boolean\"\n"
+            + "      \"query\" : \"Einstein\"\n"
             + "    }\n"
             + "  }\n"
             + "  }\n"
@@ -418,8 +458,11 @@ public String apply(JsonNode input) {
    * Tests that documents are dynamically routed to different types and not 
the type that is given
    * in the configuration. Documents should be routed to the a type of type_0 
or type_1 using a
    * modulo approach of the explicit id.
+   *
+   * <p>This test does not work with ES 6 because ES 6 does not allow one 
mapping has more than 1
+   * type
    */
-  void testWriteWithTypeFn() throws Exception {
+  void testWriteWithTypeFn2x5x() throws Exception {
     // defensive coding: this test requires an even number of docs
     long adjustedNumDocs = (numDocs & 1) == 0 ? numDocs : numDocs + 1;
 
diff --git a/sdks/java/io/elasticsearch/build.gradle 
b/sdks/java/io/elasticsearch/build.gradle
index b6a06c882a8..8654dbdedf7 100644
--- a/sdks/java/io/elasticsearch/build.gradle
+++ b/sdks/java/io/elasticsearch/build.gradle
@@ -27,7 +27,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.jackson_databind
   shadow library.java.jackson_annotations
-  shadow "org.elasticsearch.client:elasticsearch-rest-client:5.6.3"
+  shadow "org.elasticsearch.client:elasticsearch-rest-client:6.4.0"
   shadow "org.apache.httpcomponents:httpasyncclient:4.1.4"
   shadow "org.apache.httpcomponents:httpcore-nio:4.4.10"
   shadow "org.apache.httpcomponents:httpcore:4.4.10"
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 30c93d1782a..9d8eaf268ab 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -133,7 +133,9 @@
  *
  * <p>Optionally, you can provide {@link 
ElasticsearchIO.Write.FieldValueExtractFn} using {@code
  * withIndexFn()} or {@code withTypeFn()} to enable per-document routing to 
the target Elasticsearch
- * index and type.
+ * index (all versions) and type (version &gt; 6). Support for type routing 
was removed in
+ * Elasticsearch 6 (see
+ * 
https://www.elastic.co/blog/index-type-parent-child-join-now-future-in-elasticsearch)
  *
  * <p>When {withUsePartialUpdate()} is enabled, the input document must 
contain an id field and
  * {@code withIdFn()} must be used to allow its extraction by the 
ElasticsearchIO.
@@ -186,7 +188,7 @@ static void checkForErrors(Response response, int 
backendVersion) throws IOExcep
         String errorRootName = "";
         if (backendVersion == 2) {
           errorRootName = "create";
-        } else if (backendVersion == 5) {
+        } else if (backendVersion == 5 || backendVersion == 6) {
           errorRootName = "index";
         }
         JsonNode errorRoot = item.path(errorRootName);
@@ -277,6 +279,8 @@ public static ConnectionConfiguration create(String[] 
addresses, String index, S
      * If Elasticsearch authentication is enabled, provide the username.
      *
      * @param username the username used to authenticate to Elasticsearch
+     * @return a {@link ConnectionConfiguration} describes a connection 
configuration to
+     *     Elasticsearch.
      */
     public ConnectionConfiguration withUsername(String username) {
       checkArgument(username != null, "username can not be null");
@@ -288,6 +292,8 @@ public ConnectionConfiguration withUsername(String 
username) {
      * If Elasticsearch authentication is enabled, provide the password.
      *
      * @param password the password used to authenticate to Elasticsearch
+     * @return a {@link ConnectionConfiguration} describes a connection 
configuration to
+     *     Elasticsearch.
      */
     public ConnectionConfiguration withPassword(String password) {
       checkArgument(password != null, "password can not be null");
@@ -300,6 +306,8 @@ public ConnectionConfiguration withPassword(String 
password) {
      * containing the client key.
      *
      * @param keystorePath the location of the keystore containing the client 
key.
+     * @return a {@link ConnectionConfiguration} describes a connection 
configuration to
+     *     Elasticsearch.
      */
     public ConnectionConfiguration withKeystorePath(String keystorePath) {
       checkArgument(keystorePath != null, "keystorePath can not be null");
@@ -312,6 +320,8 @@ public ConnectionConfiguration withKeystorePath(String 
keystorePath) {
      * to open the client keystore.
      *
      * @param keystorePassword the password of the client keystore.
+     * @return a {@link ConnectionConfiguration} describes a connection 
configuration to
+     *     Elasticsearch.
      */
     public ConnectionConfiguration withKeystorePassword(String 
keystorePassword) {
       checkArgument(keystorePassword != null, "keystorePassword can not be 
null");
@@ -402,7 +412,13 @@ RestClient createClient() throws IOException {
       abstract Read build();
     }
 
-    /** Provide the Elasticsearch connection configuration object. */
+    /**
+     * Provide the Elasticsearch connection configuration object.
+     *
+     * @param connectionConfiguration a {@link ConnectionConfiguration} 
describes a connection
+     *     configuration to Elasticsearch.
+     * @return a {@link PTransform} reading data from Elasticsearch.
+     */
     public Read withConnectionConfiguration(ConnectionConfiguration 
connectionConfiguration) {
       checkArgument(connectionConfiguration != null, "connectionConfiguration 
can not be null");
       return 
builder().setConnectionConfiguration(connectionConfiguration).build();
@@ -414,6 +430,7 @@ public Read 
withConnectionConfiguration(ConnectionConfiguration connectionConfig
      * @param query the query. See <a
      *     
href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html";>Query
      *     DSL</a>
+     * @return a {@link PTransform} reading data from Elasticsearch.
      */
     public Read withQuery(String query) {
       checkArgument(query != null, "query can not be null");
@@ -423,6 +440,8 @@ public Read withQuery(String query) {
 
     /**
      * Include metadata in result json documents. Document source will be 
under json node _source.
+     *
+     * @return a {@link PTransform} reading data from Elasticsearch.
      */
     public Read withMetadata() {
       return builder().setWithMetadata(true).build();
@@ -432,6 +451,9 @@ public Read withMetadata() {
      * Provide a scroll keepalive. See <a
      * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html";>scroll
      * API</a> Default is "5m". Change this only if you get "No search context 
found" errors.
+     *
+     * @param scrollKeepalive keepalive duration of the scroll
+     * @return a {@link PTransform} reading data from Elasticsearch.
      */
     public Read withScrollKeepalive(String scrollKeepalive) {
       checkArgument(scrollKeepalive != null, "scrollKeepalive can not be 
null");
@@ -447,6 +469,7 @@ public Read withScrollKeepalive(String scrollKeepalive) {
      * batchSize
      *
      * @param batchSize number of documents read in each scroll read
+     * @return a {@link PTransform} reading data from Elasticsearch.
      */
     public Read withBatchSize(long batchSize) {
       checkArgument(
@@ -522,9 +545,9 @@ private BoundedElasticsearchSource(
       List<BoundedElasticsearchSource> sources = new ArrayList<>();
       if (backendVersion == 2) {
         // 1. We split per shard :
-        // unfortunately, Elasticsearch 2. x doesn 't provide a way to do 
parallel reads on a single
+        // unfortunately, Elasticsearch 2.x doesn't provide a way to do 
parallel reads on a single
         // shard.So we do not use desiredBundleSize because we cannot split 
shards.
-        // With the slice API in ES 5.0 we will be able to use 
desiredBundleSize.
+        // With the slice API in ES 5.x+ we will be able to use 
desiredBundleSize.
         // Basically we will just ask the slice API to return data
         // in nbBundles = estimatedSize / desiredBundleSize chuncks.
         // So each beam source will read around desiredBundleSize volume of 
data.
@@ -540,11 +563,11 @@ private BoundedElasticsearchSource(
           sources.add(new BoundedElasticsearchSource(spec, shardId, null, 
null, backendVersion));
         }
         checkArgument(!sources.isEmpty(), "No shard found");
-      } else if (backendVersion == 5) {
+      } else if (backendVersion == 5 || backendVersion == 6) {
         long indexSize = 
BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
         float nbBundlesFloat = (float) indexSize / desiredBundleSizeBytes;
         int nbBundles = (int) Math.ceil(nbBundlesFloat);
-        //ES slice api imposes that the number of slices is <= 1024 even if it 
can be overloaded
+        // ES slice api imposes that the number of slices is <= 1024 even if 
it can be overloaded
         if (nbBundles > 1024) {
           nbBundles = 1024;
         }
@@ -573,7 +596,7 @@ static long estimateIndexSize(ConnectionConfiguration 
connectionConfiguration)
       // as Elasticsearch 2.x doesn't not support any way to do parallel read 
inside a shard
       // the estimated size bytes is not really used in the split into bundles.
       // However, we implement this method anyway as the runners can use it.
-      // NB: Elasticsearch 5.x now provides the slice API.
+      // NB: Elasticsearch 5.x+ now provides the slice API.
       // 
(https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
       // #sliced-scroll)
       JsonNode statsJson = getStats(connectionConfiguration, false);
@@ -640,7 +663,9 @@ public boolean start() throws IOException {
       if (query == null) {
         query = "{\"query\": { \"match_all\": {} }}";
       }
-      if (source.backendVersion == 5 && source.numSlices != null && 
source.numSlices > 1) {
+      if ((source.backendVersion == 5 || source.backendVersion == 6)
+          && source.numSlices != null
+          && source.numSlices > 1) {
         //if there is more than one slice, add the slice to the user query
         String sliceQuery =
             String.format("\"slice\": {\"id\": %s,\"max\": %s}", 
source.sliceId, source.numSlices);
@@ -1240,10 +1265,10 @@ static int getBackendVersion(ConnectionConfiguration 
connectionConfiguration) {
       int backendVersion =
           
Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 
1));
       checkArgument(
-          (backendVersion == 2 || backendVersion == 5),
+          (backendVersion == 2 || backendVersion == 5 || backendVersion == 6),
           "The Elasticsearch version to connect to is %s.x. "
               + "This version of the ElasticsearchIO is only compatible with "
-              + "Elasticsearch v5.x and v2.x",
+              + "Elasticsearch v6.x, v5.x and v2.x",
           backendVersion);
       return backendVersion;
 
diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle
index 4f73e23ee13..61565b0a5fe 100644
--- a/sdks/java/javadoc/build.gradle
+++ b/sdks/java/javadoc/build.gradle
@@ -54,6 +54,7 @@ def exportedJavadocProjects = [
   ':beam-sdks-java-io-elasticsearch',
   ':beam-sdks-java-io-elasticsearch-tests-2',
   ':beam-sdks-java-io-elasticsearch-tests-5',
+  ':beam-sdks-java-io-elasticsearch-tests-6',
   ':beam-sdks-java-io-elasticsearch-tests-common',
   ':beam-sdks-java-io-google-cloud-platform',
   ':beam-sdks-java-io-hadoop-common',
diff --git a/settings.gradle b/settings.gradle
index 89e68951303..3ccdd4418c7 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -116,6 +116,8 @@ include "beam-sdks-java-io-elasticsearch-tests-2"
 project(":beam-sdks-java-io-elasticsearch-tests-2").dir = 
file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-2")
 include "beam-sdks-java-io-elasticsearch-tests-5"
 project(":beam-sdks-java-io-elasticsearch-tests-5").dir = 
file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-5")
+include "beam-sdks-java-io-elasticsearch-tests-6"
+project(":beam-sdks-java-io-elasticsearch-tests-6").dir = 
file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-6")
 include "beam-sdks-java-io-elasticsearch-tests-common"
 project(":beam-sdks-java-io-elasticsearch-tests-common").dir = 
file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-common")
 include "beam-sdks-java-io-file-based-io-tests"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 141645)
    Time Spent: 11h 50m  (was: 11h 40m)

> Support ES 6.x for ElasticsearchIO
> ----------------------------------
>
>                 Key: BEAM-5107
>                 URL: https://issues.apache.org/jira/browse/BEAM-5107
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Dat Tran
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> Elasticsearch has released 6.3.2 but ElasticsearchIO only supports 2x-5.x.
> We should support ES 6.x for ElasticsearchIO.
> https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to