Repository: beam
Updated Branches:
  refs/heads/master e980ae921 -> 1c6861f22


[BEAM-2410] Remove TransportClient from ElasticSearchIO to decouple IO and ES 
server versions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7caea7a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7caea7a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7caea7a8

Branch: refs/heads/master
Commit: 7caea7a845eff072a647baf69b9b004db4523652
Parents: e980ae9
Author: Etienne Chauchot <echauc...@gmail.com>
Authored: Mon Jun 5 16:21:58 2017 +0200
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Fri Jun 9 07:31:06 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/common/IOTestPipelineOptions.java    |  6 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |  4 +-
 .../elasticsearch/ElasticSearchIOTestUtils.java | 81 +++++++++++---------
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 14 ++--
 .../io/elasticsearch/ElasticsearchIOTest.java   | 36 +++++----
 .../elasticsearch/ElasticsearchTestDataSet.java | 37 ++++-----
 6 files changed, 87 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 387fd22..25ab929 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,11 +71,7 @@ public interface IOTestPipelineOptions extends 
TestPipelineOptions {
   Integer getElasticsearchHttpPort();
   void setElasticsearchHttpPort(Integer value);
 
-  @Description("Tcp port for elasticsearch server")
-  @Default.Integer(9300)
-  Integer getElasticsearchTcpPort();
-  void setElasticsearchTcpPort(Integer value);
-
+  /* Cassandra */
   @Description("Host for Cassandra server (host name/ip address)")
   @Default.String("cassandra-host")
   String getCassandraHost();

http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index f6ceef2..e3965dc 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -139,7 +139,7 @@ public class ElasticsearchIO {
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
-  private static JsonNode parseResponse(Response response) throws IOException {
+  static JsonNode parseResponse(Response response) throws IOException {
     return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
   }
 
@@ -264,7 +264,7 @@ public class ElasticsearchIO {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    private RestClient createClient() throws MalformedURLException {
+    RestClient createClient() throws MalformedURLException {
       HttpHost[] hosts = new HttpHost[getAddresses().size()];
       int i = 0;
       for (String address : getAddresses()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index b0d161f..203963d 100644
--- 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -17,19 +17,17 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.index.IndexNotFoundException;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
 
 /** Test utilities to use with {@link ElasticsearchIO}. */
 class ElasticSearchIOTestUtils {
@@ -41,57 +39,68 @@ class ElasticSearchIOTestUtils {
   }
 
   /** Deletes the given index synchronously. */
-  static void deleteIndex(String index, Client client) throws Exception {
-    IndicesAdminClient indices = client.admin().indices();
-    IndicesExistsResponse indicesExistsResponse =
-        indices.exists(new IndicesExistsRequest(index)).get();
-    if (indicesExistsResponse.isExists()) {
-      indices.prepareClose(index).get();
-      indices.delete(Requests.deleteIndexRequest(index)).get();
+  static void deleteIndex(String index, RestClient restClient) throws 
IOException {
+    try {
+      restClient.performRequest("DELETE", String.format("/%s", index), new 
BasicHeader("", ""));
+    } catch (IOException e) {
+      // it is fine to ignore this expression as deleteIndex occurs in @before,
+      // so when the first tests is run, the index does not exist yet
+      if (!e.getMessage().contains("index_not_found_exception")){
+        throw e;
+      }
     }
   }
 
   /** Inserts the given number of test documents into Elasticsearch. */
-  static void insertTestDocuments(String index, String type, long numDocs, 
Client client)
-      throws Exception {
-    final BulkRequestBuilder bulkRequestBuilder = 
client.prepareBulk().setRefresh(true);
+  static void insertTestDocuments(String index, String type, long numDocs, 
RestClient restClient)
+      throws IOException {
     List<String> data =
         ElasticSearchIOTestUtils.createDocuments(
             numDocs, 
ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    StringBuilder bulkRequest = new StringBuilder();
     for (String document : data) {
-      bulkRequestBuilder.add(client.prepareIndex(index, type, 
null).setSource(document));
+      bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document));
     }
-    final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
-    if (bulkResponse.hasFailures()) {
+    String endPoint = String.format("/%s/%s/_bulk", index, type);
+    HttpEntity requestBody =
+        new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
+    Response response = restClient.performRequest("POST", endPoint,
+        Collections.singletonMap("refresh", "true"), requestBody,
+        new BasicHeader("", ""));
+    JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+    boolean errors = searchResult.path("errors").asBoolean();
+    if (errors){
       throw new IOException(
-          String.format(
-              "Cannot insert test documents in index %s : %s",
-              index, bulkResponse.buildFailureMessage()));
+          String.format("Failed to insert test documents in index %s", index));
     }
   }
 
   /**
-   * Forces an upgrade of the given index to make recently inserted documents 
available for search.
+   * Forces a refresh of the given index to make recently inserted documents 
available for search.
    *
    * @return The number of docs in the index
    */
-  static long upgradeIndexAndGetCurrentNumDocs(String index, String type, 
Client client) {
+  static long refreshIndexAndGetCurrentNumDocs(String index, String type, 
RestClient restClient)
+      throws IOException {
+    long result = 0;
     try {
-      client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
-      SearchResponse response =
-          client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
-      return response.getHits().getTotalHits();
+      String endPoint = String.format("/%s/_refresh", index);
+      restClient.performRequest("POST", endPoint, new BasicHeader("", ""));
+
+      endPoint = String.format("/%s/%s/_search", index, type);
+      Response response = restClient.performRequest("GET", endPoint, new 
BasicHeader("", ""));
+      JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+      result = searchResult.path("hits").path("total").asLong();
+    } catch (IOException e) {
       // it is fine to ignore bellow exceptions because in 
testWriteWithBatchSize* sometimes,
       // we call upgrade before any doc have been written
       // (when there are fewer docs processed than batchSize).
       // In that cases index/type has not been created (created upon first doc 
insertion)
-    } catch (IndexNotFoundException e) {
-    } catch (java.lang.IllegalArgumentException e) {
-      if (!e.getMessage().contains("No search type")) {
+      if (!e.getMessage().contains("index_not_found_exception")){
         throw e;
       }
     }
-    return 0;
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 2d6393a..7c37e87 100644
--- 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ElasticsearchIOIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchIOIT.class);
-  private static TransportClient client;
+  private static  RestClient restClient;
   private static IOTestPipelineOptions options;
   private static ElasticsearchIO.ConnectionConfiguration 
readConnectionConfiguration;
   @Rule public TestPipeline pipeline = TestPipeline.create();
@@ -66,16 +66,16 @@ public class ElasticsearchIOIT {
   public static void beforeClass() throws Exception {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     options = 
TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
-    client = ElasticsearchTestDataSet.getClient(options);
     readConnectionConfiguration =
         ElasticsearchTestDataSet.getConnectionConfiguration(
             options, ElasticsearchTestDataSet.ReadOrWrite.READ);
+    restClient = readConnectionConfiguration.createClient();
   }
 
   @AfterClass
   public static void afterClass() throws Exception {
-    ElasticsearchTestDataSet.deleteIndex(client, 
ElasticsearchTestDataSet.ReadOrWrite.WRITE);
-    client.close();
+    ElasticsearchTestDataSet.deleteIndex(restClient, 
ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+    restClient.close();
   }
 
   @Test
@@ -128,8 +128,8 @@ public class ElasticsearchIOIT {
     pipeline.run();
 
     long currentNumDocs =
-        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
-            ElasticsearchTestDataSet.ES_INDEX, 
ElasticsearchTestDataSet.ES_TYPE, client);
+        ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
+            ElasticsearchTestDataSet.ES_INDEX, 
ElasticsearchTestDataSet.ES_TYPE, restClient);
     assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 260af79..b349a29 100644
--- 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.values.PCollection;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
 import org.hamcrest.CustomMatcher;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -74,9 +74,10 @@ public class ElasticsearchIOTest implements Serializable {
   private static final long BATCH_SIZE_BYTES = 2048L;
 
   private static Node node;
+  private static RestClient restClient;
   private static ElasticsearchIO.ConnectionConfiguration 
connectionConfiguration;
 
-  @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
 
@@ -91,8 +92,8 @@ public class ElasticsearchIOTest implements Serializable {
             .put("cluster.name", "beam")
             .put("http.enabled", "true")
             .put("node.data", "true")
-            .put("path.data", folder.getRoot().getPath())
-            .put("path.home", folder.getRoot().getPath())
+            .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
+            .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
             .put("node.name", "beam")
             .put("network.host", ES_IP)
             .put("http.port", esHttpPort)
@@ -100,27 +101,29 @@ public class ElasticsearchIOTest implements Serializable {
             // had problems with some jdk, embedded ES was too slow for bulk 
insertion,
             // and queue of 50 was full. No pb with real ES instance (cf 
testWrite integration test)
             .put("threadpool.bulk.queue_size", 100);
-    node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
+    node = new Node(settingsBuilder.build());
     LOG.info("Elasticsearch node created");
     node.start();
     connectionConfiguration =
       ElasticsearchIO.ConnectionConfiguration.create(
         new String[] {"http://"; + ES_IP + ":" + esHttpPort}, ES_INDEX, 
ES_TYPE);
+    restClient = connectionConfiguration.createClient();
   }
 
   @AfterClass
-  public static void afterClass() {
+  public static void afterClass() throws IOException{
+    restClient.close();
     node.close();
   }
 
   @Before
   public void before() throws Exception {
-    ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
+    ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient);
   }
 
   @Test
   public void testSizes() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
node.client());
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
restClient);
     PipelineOptions options = PipelineOptionsFactory.create();
     ElasticsearchIO.Read read =
         
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
@@ -134,7 +137,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testRead() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
node.client());
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
restClient);
 
     PCollection<String> output =
         pipeline.apply(
@@ -150,7 +153,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testReadWithQuery() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
node.client());
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
restClient);
 
     String query =
         "{\n"
@@ -185,7 +188,7 @@ public class ElasticsearchIOTest implements Serializable {
     pipeline.run();
 
     long currentNumDocs =
-        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, 
ES_TYPE, node.client());
+        ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, 
ES_TYPE, restClient);
     assertEquals(NUM_DOCS, currentNumDocs);
 
     QueryBuilder queryBuilder = 
QueryBuilders.queryStringQuery("Einstein").field("scientist");
@@ -258,9 +261,8 @@ public class ElasticsearchIOTest implements Serializable {
       if ((numDocsProcessed % 100) == 0) {
         // force the index to upgrade after inserting for the inserted docs
         // to be searchable immediately
-        long currentNumDocs =
-            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
-                ES_INDEX, ES_TYPE, node.client());
+        long currentNumDocs = ElasticSearchIOTestUtils
+            .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
         if ((numDocsProcessed % BATCH_SIZE) == 0) {
           /* bundle end */
           assertEquals(
@@ -304,8 +306,8 @@ public class ElasticsearchIOTest implements Serializable {
         // force the index to upgrade after inserting for the inserted docs
         // to be searchable immediately
         long currentNumDocs =
-            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
-                ES_INDEX, ES_TYPE, node.client());
+            ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
+                ES_INDEX, ES_TYPE, restClient);
         if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
           /* bundle end */
           assertThat(
@@ -327,7 +329,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testSplit() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
node.client());
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, 
restClient);
     PipelineOptions options = PipelineOptionsFactory.create();
     ElasticsearchIO.Read read =
         
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);

http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 3a9aae6..2a2dbe9 100644
--- 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -17,13 +17,11 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import static java.net.InetAddress.getByName;
 
 import java.io.IOException;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.client.RestClient;
 
 /**
  * Manipulates test data used by the {@link ElasticsearchIO}
@@ -51,7 +49,6 @@ public class ElasticsearchTestDataSet {
    * 
-Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \
    *   -Dexec.args="--elasticsearchServer=1.2.3.4 \
    *  --elasticsearchHttpPort=9200 \
-   *  --elasticsearchTcpPort=9300" \
    *   -Dexec.classpathScope=test
    *   </pre>
    *
@@ -62,29 +59,20 @@ public class ElasticsearchTestDataSet {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     IOTestPipelineOptions options =
         PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-
-    createAndPopulateIndex(getClient(options), ReadOrWrite.READ);
+    createAndPopulateReadIndex(options);
   }
 
-  private static void createAndPopulateIndex(TransportClient client, 
ReadOrWrite rOw)
-      throws Exception {
+  private static void createAndPopulateReadIndex(IOTestPipelineOptions 
options) throws Exception {
+    RestClient restClient =  getConnectionConfiguration(options, 
ReadOrWrite.READ).createClient();
     // automatically creates the index and insert docs
-    ElasticSearchIOTestUtils.insertTestDocuments(
-        (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, 
client);
-  }
-
-  public static TransportClient getClient(IOTestPipelineOptions options) 
throws Exception {
-    TransportClient client =
-        TransportClient.builder()
-            .build()
-            .addTransportAddress(
-                new InetSocketTransportAddress(
-                    getByName(options.getElasticsearchServer()),
-                    options.getElasticsearchTcpPort()));
-    return client;
+    try {
+      ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, 
NUM_DOCS, restClient);
+    } finally {
+      restClient.close();
+    }
   }
 
-  public static ElasticsearchIO.ConnectionConfiguration 
getConnectionConfiguration(
+  static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
       IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
     ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
         ElasticsearchIO.ConnectionConfiguration.create(
@@ -99,8 +87,9 @@ public class ElasticsearchTestDataSet {
     return connectionConfiguration;
   }
 
-  public static void deleteIndex(TransportClient client, ReadOrWrite rOw) 
throws Exception {
-    ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX 
: writeIndex, client);
+  static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws 
Exception {
+    ElasticSearchIOTestUtils
+        .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, 
restClient);
   }
 
   /** Enum that tells whether we use the index for reading or for writing. */

Reply via email to