Repository: beam Updated Branches: refs/heads/master e980ae921 -> 1c6861f22
[BEAM-2410] Remove TransportClient from ElasticSearchIO to decouple IO and ES server versions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7caea7a8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7caea7a8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7caea7a8 Branch: refs/heads/master Commit: 7caea7a845eff072a647baf69b9b004db4523652 Parents: e980ae9 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Mon Jun 5 16:21:58 2017 +0200 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Fri Jun 9 07:31:06 2017 +0200 ---------------------------------------------------------------------- .../sdk/io/common/IOTestPipelineOptions.java | 6 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 4 +- .../elasticsearch/ElasticSearchIOTestUtils.java | 81 +++++++++++--------- .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 14 ++-- .../io/elasticsearch/ElasticsearchIOTest.java | 36 +++++---- .../elasticsearch/ElasticsearchTestDataSet.java | 37 ++++----- 6 files changed, 87 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 387fd22..25ab929 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -71,11 +71,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { Integer getElasticsearchHttpPort(); void setElasticsearchHttpPort(Integer value); - @Description("Tcp port for elasticsearch server") - @Default.Integer(9300) - Integer getElasticsearchTcpPort(); - void setElasticsearchTcpPort(Integer value); - + /* Cassandra */ @Description("Host for Cassandra server (host name/ip address)") @Default.String("cassandra-host") String getCassandraHost(); http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index f6ceef2..e3965dc 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -139,7 +139,7 @@ public class ElasticsearchIO { private static final ObjectMapper mapper = new ObjectMapper(); - private static JsonNode parseResponse(Response response) throws IOException { + static JsonNode parseResponse(Response response) throws IOException { return mapper.readValue(response.getEntity().getContent(), JsonNode.class); } @@ -264,7 +264,7 @@ public class ElasticsearchIO { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - private RestClient createClient() throws MalformedURLException { + RestClient createClient() throws MalformedURLException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; for (String address : getAddresses()) { http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java index b0d161f..203963d 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java @@ -17,19 +17,17 @@ */ package org.apache.beam.sdk.io.elasticsearch; +import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.client.Requests; -import org.elasticsearch.index.IndexNotFoundException; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; /** Test utilities to use with {@link ElasticsearchIO}. */ class ElasticSearchIOTestUtils { @@ -41,57 +39,68 @@ class ElasticSearchIOTestUtils { } /** Deletes the given index synchronously. */ - static void deleteIndex(String index, Client client) throws Exception { - IndicesAdminClient indices = client.admin().indices(); - IndicesExistsResponse indicesExistsResponse = - indices.exists(new IndicesExistsRequest(index)).get(); - if (indicesExistsResponse.isExists()) { - indices.prepareClose(index).get(); - indices.delete(Requests.deleteIndexRequest(index)).get(); + static void deleteIndex(String index, RestClient restClient) throws IOException { + try { + restClient.performRequest("DELETE", String.format("/%s", index), new BasicHeader("", "")); + } catch (IOException e) { + // it is fine to ignore this expression as deleteIndex occurs in @before, + // so when the first tests is run, the index does not exist yet + if (!e.getMessage().contains("index_not_found_exception")){ + throw e; + } } } /** Inserts the given number of test documents into Elasticsearch. */ - static void insertTestDocuments(String index, String type, long numDocs, Client client) - throws Exception { - final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true); + static void insertTestDocuments(String index, String type, long numDocs, RestClient restClient) + throws IOException { List<String> data = ElasticSearchIOTestUtils.createDocuments( numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + StringBuilder bulkRequest = new StringBuilder(); for (String document : data) { - bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document)); + bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document)); } - final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); - if (bulkResponse.hasFailures()) { + String endPoint = String.format("/%s/%s/_bulk", index, type); + HttpEntity requestBody = + new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); + Response response = restClient.performRequest("POST", endPoint, + Collections.singletonMap("refresh", "true"), requestBody, + new BasicHeader("", "")); + JsonNode searchResult = ElasticsearchIO.parseResponse(response); + boolean errors = searchResult.path("errors").asBoolean(); + if (errors){ throw new IOException( - String.format( - "Cannot insert test documents in index %s : %s", - index, bulkResponse.buildFailureMessage())); + String.format("Failed to insert test documents in index %s", index)); } } /** - * Forces an upgrade of the given index to make recently inserted documents available for search. + * Forces a refresh of the given index to make recently inserted documents available for search. * * @return The number of docs in the index */ - static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) { + static long refreshIndexAndGetCurrentNumDocs(String index, String type, RestClient restClient) + throws IOException { + long result = 0; try { - client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet(); - SearchResponse response = - client.prepareSearch(index).setTypes(type).execute().actionGet(5000); - return response.getHits().getTotalHits(); + String endPoint = String.format("/%s/_refresh", index); + restClient.performRequest("POST", endPoint, new BasicHeader("", "")); + + endPoint = String.format("/%s/%s/_search", index, type); + Response response = restClient.performRequest("GET", endPoint, new BasicHeader("", "")); + JsonNode searchResult = ElasticsearchIO.parseResponse(response); + result = searchResult.path("hits").path("total").asLong(); + } catch (IOException e) { // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes, // we call upgrade before any doc have been written // (when there are fewer docs processed than batchSize). // In that cases index/type has not been created (created upon first doc insertion) - } catch (IndexNotFoundException e) { - } catch (java.lang.IllegalArgumentException e) { - if (!e.getMessage().contains("No search type")) { + if (!e.getMessage().contains("index_not_found_exception")){ throw e; } } - return 0; + return result; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index 2d6393a..7c37e87 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.client.RestClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; */ public class ElasticsearchIOIT { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOIT.class); - private static TransportClient client; + private static RestClient restClient; private static IOTestPipelineOptions options; private static ElasticsearchIO.ConnectionConfiguration readConnectionConfiguration; @Rule public TestPipeline pipeline = TestPipeline.create(); @@ -66,16 +66,16 @@ public class ElasticsearchIOIT { public static void beforeClass() throws Exception { PipelineOptionsFactory.register(IOTestPipelineOptions.class); options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class); - client = ElasticsearchTestDataSet.getClient(options); readConnectionConfiguration = ElasticsearchTestDataSet.getConnectionConfiguration( options, ElasticsearchTestDataSet.ReadOrWrite.READ); + restClient = readConnectionConfiguration.createClient(); } @AfterClass public static void afterClass() throws Exception { - ElasticsearchTestDataSet.deleteIndex(client, ElasticsearchTestDataSet.ReadOrWrite.WRITE); - client.close(); + ElasticsearchTestDataSet.deleteIndex(restClient, ElasticsearchTestDataSet.ReadOrWrite.WRITE); + restClient.close(); } @Test @@ -128,8 +128,8 @@ public class ElasticsearchIOIT { pipeline.run(); long currentNumDocs = - ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( - ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, client); + ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs( + ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, restClient); assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs); } http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 260af79..b349a29 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.values.PCollection; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; import org.hamcrest.CustomMatcher; import org.junit.AfterClass; import org.junit.Before; @@ -74,9 +74,10 @@ public class ElasticsearchIOTest implements Serializable { private static final long BATCH_SIZE_BYTES = 2048L; private static Node node; + private static RestClient restClient; private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration; - @ClassRule public static TemporaryFolder folder = new TemporaryFolder(); + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @Rule public TestPipeline pipeline = TestPipeline.create(); @@ -91,8 +92,8 @@ public class ElasticsearchIOTest implements Serializable { .put("cluster.name", "beam") .put("http.enabled", "true") .put("node.data", "true") - .put("path.data", folder.getRoot().getPath()) - .put("path.home", folder.getRoot().getPath()) + .put("path.data", TEMPORARY_FOLDER.getRoot().getPath()) + .put("path.home", TEMPORARY_FOLDER.getRoot().getPath()) .put("node.name", "beam") .put("network.host", ES_IP) .put("http.port", esHttpPort) @@ -100,27 +101,29 @@ public class ElasticsearchIOTest implements Serializable { // had problems with some jdk, embedded ES was too slow for bulk insertion, // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test) .put("threadpool.bulk.queue_size", 100); - node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build(); + node = new Node(settingsBuilder.build()); LOG.info("Elasticsearch node created"); node.start(); connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create( new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE); + restClient = connectionConfiguration.createClient(); } @AfterClass - public static void afterClass() { + public static void afterClass() throws IOException{ + restClient.close(); node.close(); } @Before public void before() throws Exception { - ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client()); + ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient); } @Test public void testSizes() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); PipelineOptions options = PipelineOptionsFactory.create(); ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); @@ -134,7 +137,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testRead() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); PCollection<String> output = pipeline.apply( @@ -150,7 +153,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testReadWithQuery() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); String query = "{\n" @@ -185,7 +188,7 @@ public class ElasticsearchIOTest implements Serializable { pipeline.run(); long currentNumDocs = - ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client()); + ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient); assertEquals(NUM_DOCS, currentNumDocs); QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist"); @@ -258,9 +261,8 @@ public class ElasticsearchIOTest implements Serializable { if ((numDocsProcessed % 100) == 0) { // force the index to upgrade after inserting for the inserted docs // to be searchable immediately - long currentNumDocs = - ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( - ES_INDEX, ES_TYPE, node.client()); + long currentNumDocs = ElasticSearchIOTestUtils + .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient); if ((numDocsProcessed % BATCH_SIZE) == 0) { /* bundle end */ assertEquals( @@ -304,8 +306,8 @@ public class ElasticsearchIOTest implements Serializable { // force the index to upgrade after inserting for the inserted docs // to be searchable immediately long currentNumDocs = - ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( - ES_INDEX, ES_TYPE, node.client()); + ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs( + ES_INDEX, ES_TYPE, restClient); if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) { /* bundle end */ assertThat( @@ -327,7 +329,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testSplit() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); PipelineOptions options = PipelineOptionsFactory.create(); ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java index 3a9aae6..2a2dbe9 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java @@ -17,13 +17,11 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import static java.net.InetAddress.getByName; import java.io.IOException; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.client.RestClient; /** * Manipulates test data used by the {@link ElasticsearchIO} @@ -51,7 +49,6 @@ public class ElasticsearchTestDataSet { * -Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \ * -Dexec.args="--elasticsearchServer=1.2.3.4 \ * --elasticsearchHttpPort=9200 \ - * --elasticsearchTcpPort=9300" \ * -Dexec.classpathScope=test * </pre> * @@ -62,29 +59,20 @@ public class ElasticsearchTestDataSet { PipelineOptionsFactory.register(IOTestPipelineOptions.class); IOTestPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class); - - createAndPopulateIndex(getClient(options), ReadOrWrite.READ); + createAndPopulateReadIndex(options); } - private static void createAndPopulateIndex(TransportClient client, ReadOrWrite rOw) - throws Exception { + private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception { + RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient(); // automatically creates the index and insert docs - ElasticSearchIOTestUtils.insertTestDocuments( - (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, client); - } - - public static TransportClient getClient(IOTestPipelineOptions options) throws Exception { - TransportClient client = - TransportClient.builder() - .build() - .addTransportAddress( - new InetSocketTransportAddress( - getByName(options.getElasticsearchServer()), - options.getElasticsearchTcpPort())); - return client; + try { + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); + } finally { + restClient.close(); + } } - public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration( + static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration( IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException { ElasticsearchIO.ConnectionConfiguration connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create( @@ -99,8 +87,9 @@ public class ElasticsearchTestDataSet { return connectionConfiguration; } - public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception { - ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client); + static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws Exception { + ElasticSearchIOTestUtils + .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, restClient); } /** Enum that tells whether we use the index for reading or for writing. */