This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
commit c15a1563f0eaa1a3762ec5eb9b0922766cd643a2 Author: Richard Zowalla <richard.zowa...@hs-heilbronn.de> AuthorDate: Wed Nov 29 14:50:14 2023 +0100 STORM-4005 - ElasticSearch 7.17.13 --- DEPENDENCY-LICENSES | 70 +++++++----- examples/storm-elasticsearch-examples/pom.xml | 8 -- .../storm/elasticsearch/bolt/EsIndexTopology.java | 9 +- .../storm/elasticsearch/common/EsTestUtil.java | 122 --------------------- .../elasticsearch/trident/TridentEsTopology.java | 10 +- external/storm-elasticsearch/pom.xml | 32 ++++-- .../storm/elasticsearch/bolt/EsIndexBolt.java | 8 +- .../storm/elasticsearch/bolt/EsLookupBolt.java | 6 +- .../storm/elasticsearch/bolt/EsPercolateBolt.java | 10 +- .../storm/elasticsearch/common/EsConfig.java | 22 ++-- .../common/StormElasticSearchClient.java | 10 +- .../storm/elasticsearch/trident/EsState.java | 7 +- .../bolt/AbstractEsBoltIntegrationTest.java | 10 +- .../storm/elasticsearch/bolt/EsIndexBoltTest.java | 41 +++++-- .../bolt/EsLookupBoltIntegrationTest.java | 13 ++- .../storm/elasticsearch/bolt/EsLookupBoltTest.java | 6 +- .../elasticsearch/bolt/EsPercolateBoltTest.java | 20 +++- .../storm/elasticsearch/common/EsTestUtil.java | 66 ++++------- .../storm/elasticsearch/trident/EsStateTest.java | 3 +- pom.xml | 2 +- 20 files changed, 207 insertions(+), 268 deletions(-) diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES index 87892a602..57fb93e51 100644 --- a/DEPENDENCY-LICENSES +++ b/DEPENDENCY-LICENSES @@ -31,6 +31,10 @@ List of third-party dependencies grouped by their license type. * Maven Plugin Tools Java Annotations (org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 - https://maven.apache.org/plugin-tools/maven-plugin-annotations) * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - https://github.com/xerial/snappy-java) + Apache-2.0, LGPL-2.1-or-later + + * Java Native Access (net.java.dev.jna:jna:5.10.0 - https://github.com/java-native-access/jna) + Apache 2.0 License * Apache MINA Core (org.apache.mina:mina-core:2.2.2 - https://mina.apache.org/mina-core/) @@ -125,7 +129,7 @@ List of third-party dependencies grouped by their license type. * Apache HttpAsyncClient (org.apache.httpcomponents:httpasyncclient:4.1.5 - http://hc.apache.org/httpcomponents-asyncclient) * Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - http://hc.apache.org/httpcomponents-client-ga) * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga) - * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.5 - http://hc.apache.org/httpcomponents-core-ga) + * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 - http://hc.apache.org/httpcomponents-core-ga) * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/) * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org) * Apache Maven Artifact Transfer (org.apache.maven.shared:maven-artifact-transfer:0.9.1 - https://maven.apache.org/shared/maven-artifact-transfer/) @@ -176,7 +180,7 @@ List of third-party dependencies grouped by their license type. * Commons Logging (commons-logging:commons-logging:1.1.3 - http://commons.apache.org/proper/commons-logging/) * Commons Math (org.apache.commons:commons-math3:3.1.1 - http://commons.apache.org/math/) * Commons Pool (commons-pool:commons-pool:1.5.4 - http://commons.apache.org/pool/) - * Compress-LZF (com.ning:compress-lzf:1.0.2 - http://github.com/ning/compress) + * compiler (com.github.spullara.mustache.java:compiler:0.9.6 - http://github.com/spullara/mustache.java) * Curator Client (org.apache.curator:curator-client:5.5.0 - https://curator.apache.org/curator-client) * Curator Framework (org.apache.curator:curator-framework:5.5.0 - https://curator.apache.org/curator-framework) * Curator Recipes (org.apache.curator:curator-recipes:5.5.0 - https://curator.apache.org/curator-recipes) @@ -198,8 +202,6 @@ List of third-party dependencies grouped by their license type. * Dropwizard Utility Classes (io.dropwizard:dropwizard-util:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-util) * Dropwizard Validation Support (io.dropwizard:dropwizard-validation:1.3.29 - http://www.dropwizard.io/1.3.29/dropwizard-validation) * Ehcache (org.ehcache:ehcache:3.3.1 - http://ehcache.org) - * Elasticsearch: Core (org.elasticsearch:elasticsearch:2.4.4 - http://nexus.sonatype.org/oss-repository-hosting.html/parent/elasticsearch) - * Elasticsearch SecureSM (org.elasticsearch:securesm:1.0 - http://nexus.sonatype.org/oss-repository-hosting.html/securesm) * error-prone annotations (com.google.errorprone:error_prone_annotations:2.21.1 - https://errorprone.info/error_prone_annotations) * Esri Geometry API for Java (com.esri.geometry:esri-geometry-api:2.0.0 - https://github.com/Esri/geometry-api-java) * fastutil (it.unimi.dsi:fastutil:6.5.6 - http://fasutil.dsi.unimi.it/) @@ -250,8 +252,8 @@ List of third-party dependencies grouped by their license type. * Hive Storage API (org.apache.hive:hive-storage-api:2.7.0 - https://www.apache.org/hive-storage-api/) * Hive Upgrade Acid (org.apache.hive:hive-upgrade-acid:3.1.3 - https://www.apache.org/hive-upgrade-acid/) * Hive Vector-Code-Gen Utilities (org.apache.hive:hive-vector-code-gen:3.1.3 - https://hive.apache.org/hive-vector-code-gen) - * HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc) * HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc) + * HPPC Collections (com.carrotsearch:hppc:0.8.1 - http://labs.carrotsearch.com/hppc.html/hppc) * htrace-core (org.apache.htrace:htrace-core:3.2.0-incubating - http://incubator.apache.org/projects/htrace.html) * j2html (com.j2html:j2html:1.6.0 - http://j2html.com) * J2ObjC Annotations (com.google.j2objc:j2objc-annotations:2.8 - https://github.com/google/j2objc/) @@ -306,21 +308,21 @@ List of third-party dependencies grouped by their license type. * Kerby Util (org.apache.kerby:kerby-util:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-util) * Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-xdr) * Logging (commons-logging:commons-logging:1.0.3 - http://jakarta.apache.org/commons/logging/) - * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common) - * Lucene Core (org.apache.lucene:lucene-core:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-core) - * Lucene Grouping (org.apache.lucene:lucene-grouping:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-grouping) - * Lucene Highlighter (org.apache.lucene:lucene-highlighter:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-highlighter) - * Lucene Join (org.apache.lucene:lucene-join:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-join) - * Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs) - * Lucene Memory (org.apache.lucene:lucene-memory:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-memory) - * Lucene Miscellaneous (org.apache.lucene:lucene-misc:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-misc) - * Lucene Queries (org.apache.lucene:lucene-queries:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queries) - * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-queryparser) - * Lucene Sandbox (org.apache.lucene:lucene-sandbox:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-sandbox) - * Lucene Spatial (org.apache.lucene:lucene-spatial:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial) - * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-spatial3d) - * Lucene Suggest (org.apache.lucene:lucene-suggest:5.5.2 - http://lucene.apache.org/lucene-parent/lucene-suggest) + * Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-analyzers-common) + * Lucene Core (org.apache.lucene:lucene-core:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-core) + * Lucene Grouping (org.apache.lucene:lucene-grouping:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-grouping) + * Lucene Highlighter (org.apache.lucene:lucene-highlighter:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-highlighter) + * Lucene Join (org.apache.lucene:lucene-join:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-join) + * Lucene Memory (org.apache.lucene:lucene-backward-codecs:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-backward-codecs) + * Lucene Memory (org.apache.lucene:lucene-memory:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-memory) + * Lucene Miscellaneous (org.apache.lucene:lucene-misc:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-misc) + * Lucene Queries (org.apache.lucene:lucene-queries:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queries) + * Lucene QueryParsers (org.apache.lucene:lucene-queryparser:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-queryparser) + * Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-sandbox) + * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-spatial3d) + * Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-suggest) * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java) + * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java) * Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 - http://maven.apache.org/maven-aether-provider/) * Maven Artifact (org.apache.maven:maven-artifact:3.0 - http://maven.apache.org/maven-artifact/) * Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - https://maven.apache.org/ref/3.6.0/maven-artifact/) @@ -419,13 +421,12 @@ List of third-party dependencies grouped by their license type. * Plexus Interpolation API (org.codehaus.plexus:plexus-interpolation:1.25 - http://codehaus-plexus.github.io/plexus-interpolation/) * Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.3 - http://spice.sonatype.org/plexus-sec-dispatcher) * Plexus Security Dispatcher Component (org.sonatype.plexus:plexus-sec-dispatcher:1.4 - http://spice.sonatype.org/plexus-sec-dispatcher) - * rest (org.elasticsearch.client:rest:5.2.2 - https://github.com/elastic/elasticsearch) + * rest (org.elasticsearch.client:elasticsearch-rest-client:7.17.13 - https://github.com/elastic/elasticsearch.git) * sigar (org.fusesource:sigar:1.6.4 - http://fusesource.com/sigar/) * Sisu - Guice (org.sonatype.sisu:sisu-guice:2.1.7 - http://forge.sonatype.com/sisu-guice/) * Sisu - Inject (JSR330 bean support) (org.sonatype.sisu:sisu-inject-bean:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/sisu-inject-bean/) * Sisu - Inject (Plexus bean support) (org.sonatype.sisu:sisu-inject-plexus:1.4.2 - http://sisu.sonatype.org/sisu-inject/guice-bean/guice-plexus/sisu-inject-plexus/) * SnakeYAML (org.yaml:snakeyaml:2.0 - https://bitbucket.org/snakeyaml/snakeyaml) - * Spatial4J (com.spatial4j:spatial4j:0.5 - http://nexus.sonatype.org/oss-repository-hosting.html/spatial4j) * Spring AOP (org.springframework:spring-aop:5.3.27 - https://github.com/spring-projects/spring-framework) * Spring Beans (org.springframework:spring-beans:5.3.27 - https://github.com/spring-projects/spring-framework) * Spring Commons Logging Bridge (org.springframework:spring-jcl:5.3.27 - https://github.com/spring-projects/spring-framework) @@ -436,7 +437,7 @@ List of third-party dependencies grouped by their license type. * Spring Messaging (org.springframework:spring-messaging:5.3.27 - https://github.com/spring-projects/spring-framework) * Spring Transaction (org.springframework:spring-tx:5.3.27 - https://github.com/spring-projects/spring-framework) * StAX API (stax:stax-api:1.0.1 - http://stax.codehaus.org/) - * T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest) + * T-Digest (com.tdunning:t-digest:3.2 - https://github.com/tdunning/t-digest) * Tephra API (co.cask.tephra:tephra-api:0.6.0 - https://github.com/caskdata/tephra/tephra-api) * Tephra Core (co.cask.tephra:tephra-core:0.6.0 - https://github.com/caskdata/tephra/tephra-core) * Tephra HBase 1.0 Compatibility (co.cask.tephra:tephra-hbase-compat-1.0:0.6.0 - https://github.com/caskdata/tephra/tephra-hbase-compat-1.0) @@ -547,10 +548,6 @@ List of third-party dependencies grouped by their license type. * JLine Bundle (org.jline:jline:3.9.0 - http://nexus.sonatype.org/oss-repository-hosting.html/jline-parent/jline) * Stax2 API (org.codehaus.woodstox:stax2-api:4.2.1 - http://github.com/FasterXML/stax2-api) - CC0 1.0 Universal - - * JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e) - CDDL/GPLv2+CE * JavaBeans Activation Framework API jar (javax.activation:javax.activation-api:1.2.0 - http://java.net/all/javax.activation-api/) @@ -637,6 +634,26 @@ List of third-party dependencies grouped by their license type. * jms (jakarta.jms:jakarta.jms-api:2.0.2 - https://projects.eclipse.org/projects/ee4j.jms) + Elastic License 2.0 + + * rest-high-level (org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + + Elastic License 2.0, Server Side Public License, v 1 + + * aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-core (org.elasticsearch:elasticsearch-core:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-geo (org.elasticsearch:elasticsearch-geo:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-lz4 (org.elasticsearch:elasticsearch-lz4:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-plugin-classloader (org.elasticsearch:elasticsearch-plugin-classloader:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-secure-sm (org.elasticsearch:elasticsearch-secure-sm:7.17.13 - https://github.com/elastic/elasticsearch.git) + * elasticsearch-x-content (org.elasticsearch:elasticsearch-x-content:7.17.13 - https://github.com/elastic/elasticsearch.git) + * lang-mustache (org.elasticsearch.plugin:lang-mustache-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + * mapper-extras (org.elasticsearch.plugin:mapper-extras-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + * parent-join (org.elasticsearch.plugin:parent-join-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + * rank-eval (org.elasticsearch.plugin:rank-eval-client:7.17.13 - https://github.com/elastic/elasticsearch.git) + * server (org.elasticsearch:elasticsearch:7.17.13 - https://github.com/elastic/elasticsearch.git) + MIT License * argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 - http://argparse4j.github.io) @@ -644,6 +661,7 @@ List of third-party dependencies grouped by their license type. * JCodings (org.jruby.jcodings:jcodings:1.0.55 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings) * Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis) * Joni (org.jruby.joni:joni:2.1.31 - http://nexus.sonatype.org/oss-repository-hosting.html/joni) + * JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple) * JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 - http://www.slf4j.org) * Microsoft JDBC Driver for SQL Server (com.microsoft.sqlserver:mssql-jdbc:6.2.1.jre7 - https://github.com/Microsoft/mssql-jdbc) * SLF4J API Module (org.slf4j:slf4j-api:1.7.36 - http://www.slf4j.org) diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml index ee864aa15..f16429bb3 100644 --- a/examples/storm-elasticsearch-examples/pom.xml +++ b/examples/storm-elasticsearch-examples/pom.xml @@ -26,9 +26,6 @@ </parent> <artifactId>storm-elasticsearch-examples</artifactId> - <properties> - <elasticsearch.test.version>2.4.4</elasticsearch.test.version> - </properties> <dependencies> <dependency> @@ -42,11 +39,6 @@ <artifactId>storm-elasticsearch</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.test.version}</version> - </dependency> </dependencies> <build> diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java index 1c9e09f1c..3c3e8565a 100644 --- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java +++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java @@ -24,10 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; -import org.apache.storm.elasticsearch.common.EsConfig; -import org.apache.storm.elasticsearch.common.EsConstants; -import org.apache.storm.elasticsearch.common.EsTestUtil; -import org.apache.storm.elasticsearch.common.EsTupleMapper; +import org.apache.storm.elasticsearch.common.*; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; @@ -76,13 +73,11 @@ public final class EsIndexTopology { TopologyBuilder builder = new TopologyBuilder(); UserDataSpout spout = new UserDataSpout(); builder.setSpout(SPOUT_ID, spout, 1); - EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper(); + EsTupleMapper tupleMapper =new DefaultEsTupleMapper(); EsConfig esConfig = new EsConfig("http://localhost:9300"); builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1) .shuffleGrouping(SPOUT_ID); - EsTestUtil.startEsNode(); - EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS); StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java deleted file mode 100644 index e96f6e897..000000000 --- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.elasticsearch.common; - -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.storm.Config; -import org.apache.storm.task.GeneralTopologyContext; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; -import org.apache.storm.tuple.Values; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; - -/** - * ElasticSearch example utilities. - */ -public final class EsTestUtil { - - /** - * Generates a test tuple. - * @param source the source of the tuple - * @param index the index of the tuple - * @param type the type of the tuple - * @param id the id of the tuple - * @return the generated tuple - */ - public static Tuple generateTestTuple(final String source, - final String index, - final String type, - final String id) { - TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext( - builder.createTopology(), - new Config(), - new HashMap<>(), - new HashMap<>(), - new HashMap<>(), - "") { - @Override - public Fields getComponentOutputFields(final String componentId, - final String streamId) { - return new Fields("source", "index", "type", "id"); - } - }; - return new TupleImpl(topologyContext, - new Values(source, index, type, id), - source, - 1, - ""); - } - - /** - * Generates a new tuple mapper. - * @return the generated mapper - */ - public static EsTupleMapper generateDefaultTupleMapper() { - return new DefaultEsTupleMapper(); - } - - /** - * Starts an ElasticSearch node. - * @return the started node. - */ - public static Node startEsNode() { - Node node = NodeBuilder.nodeBuilder().data(true).settings( - Settings.settingsBuilder() - .put(ClusterName.SETTING, EsConstants.CLUSTER_NAME) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(EsExecutors.PROCESSORS, 1) - .put("http.enabled", true) - .put("index.percolator.map_unmapped_fields_as_string", - true) - .put("index.store.type", "mmapfs") - .put("path.home", "./data") - ).build(); - node.start(); - return node; - } - - /** - * Waits for specified seconds and ignores {@link InterruptedException}. - * @param seconds the seconds to wait - */ - public static void waitForSeconds(final int seconds) { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - } catch (InterruptedException ex) { - //expected - } - } - - /** - * Utility constructor. - */ - private EsTestUtil() { - } -} diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java index c9126656e..311c816b7 100644 --- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java +++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java @@ -26,10 +26,7 @@ import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; -import org.apache.storm.elasticsearch.common.EsConfig; -import org.apache.storm.elasticsearch.common.EsConstants; -import org.apache.storm.elasticsearch.common.EsTestUtil; -import org.apache.storm.elasticsearch.common.EsTupleMapper; +import org.apache.storm.elasticsearch.common.*; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; @@ -68,16 +65,13 @@ public final class TridentEsTopology { Stream stream = topology.newStream("spout", spout); EsConfig esConfig = new EsConfig("http://localhost:9300"); Fields esFields = new Fields("index", "type", "source"); - EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper(); + EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); StateFactory factory = new EsStateFactory(esConfig, tupleMapper); TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields()); - EsTestUtil.startEsNode(); - EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS); - StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build()); diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml index 0a72b30fc..ba68ee7b8 100644 --- a/external/storm-elasticsearch/pom.xml +++ b/external/storm-elasticsearch/pom.xml @@ -58,7 +58,7 @@ </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> - <artifactId>rest</artifactId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> @@ -91,7 +91,6 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>${jackson.databind.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> @@ -111,12 +110,6 @@ <groupId>org.hamcrest</groupId> <artifactId>hamcrest</artifactId> </dependency> - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.test.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-client</artifactId> @@ -129,6 +122,12 @@ <artifactId>guava</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <version>1.19.1</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -153,6 +152,23 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>default-test</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + <configuration> + <systemPropertyVariables> + <elasticsearch-version>${elasticsearch.version}</elasticsearch-version> + </systemPropertyVariables> + </configuration> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java index 6d4686331..9d60b9786 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java @@ -31,6 +31,8 @@ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; /** * Basic bolt for storing tuple to ES document. @@ -74,7 +76,11 @@ public class EsIndexBolt extends AbstractEsBolt { String id = tupleMapper.getId(tuple); Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>()); - client.performRequest("put", getEndpoint(index, type, id), params, new StringEntity(source)); + final Request request = new Request("post", getEndpoint(index, type, id)); + request.setEntity(new StringEntity(source)); + request.addParameters(params); + + client.performRequest(request); collector.ack(tuple); } catch (Exception e) { collector.reportError(e); diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java index 5faa69fbd..f84b5b601 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.http.entity.StringEntity; import org.apache.storm.elasticsearch.DefaultEsLookupResultOutput; import org.apache.storm.elasticsearch.EsLookupResultOutput; import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper; @@ -33,6 +34,7 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; /** @@ -83,7 +85,9 @@ public class EsLookupBolt extends AbstractEsBolt { String id = tupleMapper.getId(tuple); Map<String, String> params = tupleMapper.getParams(tuple, new HashMap<>()); - Response response = client.performRequest("get", getEndpoint(index, type, id), params); + final Request request = new Request("get", getEndpoint(index, type, id)); + request.addParameters(params); + Response response = client.performRequest(request); return output.toValues(response); } diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java index 3f831f74a..2fac3e8f6 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java @@ -34,6 +34,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; /** @@ -82,8 +83,13 @@ public class EsPercolateBolt extends AbstractEsBolt { Map<String, String> indexParams = new HashMap<>(); indexParams.put(type, null); String percolateDoc = "{\"doc\": " + source + "}"; - Response response = client.performRequest("get", getEndpoint(index, type, "_percolate"), - new HashMap<>(), new StringEntity(percolateDoc)); + + final Request request = new Request("get", getEndpoint(index, type, "_percolate")); + request.setEntity(new StringEntity(percolateDoc)); + request.addParameters(new HashMap<>()); + + Response response = client.performRequest(request); + PercolateResponse percolateResponse = objectMapper.readValue(response.getEntity().getContent(), PercolateResponse.class); if (!percolateResponse.getMatches().isEmpty()) { for (PercolateResponse.Match match : percolateResponse.getMatches()) { diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java index c665ae4f8..04ac73d4c 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java @@ -35,13 +35,14 @@ import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback; public class EsConfig implements Serializable { private final HttpHost[] httpHosts; - private Integer maxRetryTimeoutMillis; private Header[] defaultHeaders; private RestClient.FailureListener failureListener; private HttpClientConfigCallback httpClientConfigCallback; private RequestConfigCallback requestConfigCallback; private String pathPrefix; + private boolean compression; + /** * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory. * Connects to Elasticsearch at http://localhost:9200. @@ -75,11 +76,6 @@ public class EsConfig implements Serializable { throw new IllegalArgumentException("Invalid url " + url); } } - - public EsConfig withMaxRetryTimeoutMillis(Integer maxRetryTimeoutMillis) { - this.maxRetryTimeoutMillis = maxRetryTimeoutMillis; - return this; - } public EsConfig withDefaultHeaders(Header[] defaultHeaders) { this.defaultHeaders = defaultHeaders; @@ -106,12 +102,13 @@ public class EsConfig implements Serializable { return this; } - public HttpHost[] getHttpHosts() { - return httpHosts; + public EsConfig withCompression(boolean compression) { + this.compression = compression; + return this; } - public Integer getMaxRetryTimeoutMillis() { - return maxRetryTimeoutMillis; + public HttpHost[] getHttpHosts() { + return httpHosts; } public Header[] getDefaultHeaders() { @@ -133,4 +130,9 @@ public class EsConfig implements Serializable { public String getPathPrefix() { return pathPrefix; } + + public boolean isCompression() { + return compression; + } + } diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java index 732aa084a..7fa57d7b9 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,9 +41,6 @@ public final class StormElasticSearchClient implements Serializable { */ public RestClient construct() { RestClientBuilder builder = RestClient.builder(esConfig.getHttpHosts()); - if (esConfig.getMaxRetryTimeoutMillis() != null) { - builder.setMaxRetryTimeoutMillis(esConfig.getMaxRetryTimeoutMillis()); - } if (esConfig.getDefaultHeaders() != null) { builder.setDefaultHeaders(esConfig.getDefaultHeaders()); } @@ -59,6 +56,9 @@ public final class StormElasticSearchClient implements Serializable { if (esConfig.getPathPrefix() != null) { builder.setPathPrefix(esConfig.getPathPrefix()); } + + builder.setCompressionEnabled(esConfig.isCompression()); + return builder.build(); } } diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java index 3a8e202c2..ef82335f8 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.HashMap; import java.util.List; import org.apache.http.entity.StringEntity; @@ -36,6 +35,7 @@ import org.apache.storm.elasticsearch.response.BulkIndexResponse; import org.apache.storm.topology.FailedException; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; @@ -122,7 +122,10 @@ class EsState implements State { public void updateState(List<TridentTuple> tuples) { try { String bulkRequest = buildRequest(tuples); - Response response = client.performRequest("post", "_bulk", new HashMap<>(), new StringEntity(bulkRequest.toString())); + + final Request request = new Request("post", "_bulk"); + request.setEntity(new StringEntity(bulkRequest)); + Response response = client.performRequest(request); BulkIndexResponse bulkResponse = objectMapper.readValue(response.getEntity().getContent(), BulkIndexResponse.class); if (bulkResponse.hasErrors()) { LOG.warn("failed processing bulk index requests: " + bulkResponse.getFirstError() + ": " + bulkResponse.getFirstResult()); diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java index afb75db1a..0819f8a8e 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java @@ -20,16 +20,20 @@ package org.apache.storm.elasticsearch.bolt; import org.apache.storm.elasticsearch.common.EsTestUtil; import org.apache.storm.testing.IntegrationTest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.node.Node; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.io.IOException; @IntegrationTest public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> { - protected static Node node; + protected static ElasticsearchContainer node; @BeforeAll public static void startElasticSearchNode() throws Exception { @@ -43,8 +47,8 @@ public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> } @BeforeEach - public void createIndex() { - node.client().admin().indices().create(new CreateIndexRequest(index)).actionGet(); + public void createIndex() throws IOException { + EsTestUtil.getRestHighLevelClient(node).indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT); } @AfterEach diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java index 9e86865b3..a7ccee4bf 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java @@ -23,31 +23,45 @@ import static org.mockito.Mockito.verify; import org.apache.storm.elasticsearch.common.EsConfig; import org.apache.storm.elasticsearch.common.EsTestUtil; import org.apache.storm.tuple.Tuple; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; +import java.io.IOException; + public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> { @Test - public void testEsIndexBolt() { + public void testEsIndexBolt() throws IOException { Tuple tuple = createTestTuple(index, type); bolt.execute(tuple); verify(outputCollector).ack(tuple); - node.client().admin().indices().prepareRefresh(index).execute().actionGet(); - SearchResponse resp = node.client().prepareSearch(index) - .setQuery(new TermQueryBuilder("_type", type)) - .setSize(0) - .execute().actionGet(); + RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node); + RefreshRequest request = new RefreshRequest(index); + client.indices().refresh(request, RequestOptions.DEFAULT); + + SearchRequest searchRequest = new SearchRequest(index); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(new TermQueryBuilder("_type", type)); + searchSourceBuilder.size(0); + searchRequest.source(searchSourceBuilder); + + SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT); assertEquals(1, resp.getHits().getTotalHits()); } @Test - public void indexMissing() { + public void indexMissing() throws IOException { String index = "missing"; Tuple tuple = createTestTuple(index, type); @@ -56,11 +70,14 @@ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> verify(outputCollector).ack(tuple); - node.client().admin().indices().prepareRefresh(index).execute().actionGet(); - SearchResponse resp = node.client().prepareSearch(index) - .setQuery(new TermQueryBuilder("_type", type)) - .setSize(0) - .execute().actionGet(); + RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node); + SearchRequest searchRequest = new SearchRequest(index); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(new TermQueryBuilder("_type", type)); + searchSourceBuilder.size(0); + searchRequest.source(searchSourceBuilder); + + SearchResponse resp = client.search(searchRequest, RequestOptions.DEFAULT); assertEquals(1, resp.getHits().getTotalHits()); } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java index de595c3f2..c31e6f310 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java @@ -27,7 +27,11 @@ import org.apache.storm.elasticsearch.common.EsConfig; import org.apache.storm.elasticsearch.common.EsTestUtil; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.xcontent.XContentType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,6 +39,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; + @ExtendWith(MockitoExtension.class) public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> { @@ -52,8 +58,11 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E } @BeforeEach - public void populateIndexWithTestData() { - node.client().prepareIndex(index, type, documentId).setSource(source).execute().actionGet(); + public void populateIndexWithTestData() throws IOException { + IndexRequest indexRequest = new IndexRequest(index, type, documentId) + .source(source, XContentType.JSON); + RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node); + client.index(indexRequest, RequestOptions.DEFAULT); } @Test diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java index 197ca6ec6..0067e465d 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java @@ -35,6 +35,8 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.AfterEach; @@ -87,7 +89,9 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> { } private void makeRequestAndThrow(Exception exception) throws IOException { - when(client.performRequest("get", AbstractEsBolt.getEndpoint(index, type, documentId), params)).thenThrow(exception); + final Request request = new Request("get", AbstractEsBolt.getEndpoint(index, type, documentId)); + request.addParameters(params); + when(client.performRequest(request)).thenThrow(exception); bolt.execute(tuple); } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java index 4ac39e1ca..d50127db0 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java @@ -29,11 +29,18 @@ import org.apache.storm.elasticsearch.common.EsTestUtil; import org.apache.storm.elasticsearch.response.PercolateResponse; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.xcontent.XContentType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.io.IOException; + public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> { private final String source = "{\"user\":\"user1\"}"; @@ -44,11 +51,14 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola } @BeforeEach - public void populateIndexWithTestData() { - node.client().prepareIndex(index, ".percolator", documentId) - .setSource("{\"query\":{\"match\":" + source + "}}") - .setRefresh(true) - .execute().actionGet(); + public void populateIndexWithTestData() throws IOException { + IndexRequest indexRequest = new IndexRequest(index, ".percolator", documentId) + .source(source, XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // setRefresh(true) in Elasticsearch 6.x + + RestHighLevelClient client = EsTestUtil.getRestHighLevelClient(node); + + client.index(indexRequest, RequestOptions.DEFAULT); } @Test diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java index 6712768ba..303fda635 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java @@ -39,23 +39,11 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; public class EsTestUtil { private static final Logger LOG = LoggerFactory.getLogger(EsTestUtil.class); @@ -91,40 +79,32 @@ public class EsTestUtil { return new ResponseException(response); } - public static Node startEsNode(){ - Node node = NodeBuilder.nodeBuilder().data(true).settings( - Settings.settingsBuilder() - .put(ClusterName.SETTING, EsConstants.clusterName) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(EsExecutors.PROCESSORS, 1) - .put("http.enabled", true) - .put("index.percolator.map_unmapped_fields_as_string", true) - .put("index.store.type", "mmapfs") - .put("path.home", "./data") - ).build(); - node.start(); - return node; + public static RestHighLevelClient getRestHighLevelClient(ElasticsearchContainer node) { + final EsConfig cfg = new EsConfig(node.getHttpHostAddress()); + return new RestHighLevelClientBuilder(new StormElasticSearchClient(cfg).construct()).build(); } - public static void ensureEsGreen(Node node) { - ClusterHealthResponse chr = node.client() - .admin() - .cluster() - .health(Requests.clusterHealthRequest() - .timeout(TimeValue.timeValueSeconds(30)) - .waitForGreenStatus() - .waitForEvents(Priority.LANGUID) - .waitForRelocatingShards(0)) - .actionGet(); - assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + public static ElasticsearchContainer startEsNode() { + String version = System.getProperty("elasticsearch-version"); + if (version == null) version = "7.17.13"; + LOG.info("Starting docker instance of Elasticsearch {}...", version); + + final ElasticsearchContainer container = + new ElasticsearchContainer( + "docker.elastic.co/elasticsearch/elasticsearch:" + version); + container.start(); + return container; + } + + public static void ensureEsGreen(ElasticsearchContainer node) { + assertThat("cluster status is green", node.isHealthy()); } /** * Stop the given Elasticsearch node and clear the data directory. * @param node */ - public static void stopEsNode(Node node) { + public static void stopEsNode(ElasticsearchContainer node) { node.close(); try { FileUtils.deleteDirectory(new File("./data")); @@ -138,10 +118,10 @@ public class EsTestUtil { * @param node - the node to connect to * @param index - the index to clear */ - public static void clearIndex(Node node, String index) { + public static void clearIndex(ElasticsearchContainer node, String index) { try { - node.client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); - } catch (IndexNotFoundException ignore) { + getRestHighLevelClient(node).indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); + } catch (IOException ignore) { } } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java index bcabe9e60..8e86e7dad 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateTest.java @@ -33,12 +33,13 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.elasticsearch.ElasticsearchContainer; @IntegrationTest @ExtendWith(MockitoExtension.class) public class EsStateTest { - private static Node node; + private static ElasticsearchContainer node; private final String[] documentId = {UUID.randomUUID().toString(), UUID.randomUUID().toString()}; private final String index = "index"; diff --git a/pom.xml b/pom.xml index 58d6eeb71..c1716ee15 100644 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,7 @@ <awaitility.version>3.1.0</awaitility.version> <hdrhistogram.version>2.1.10</hdrhistogram.version> <hamcrest.version>2.2</hamcrest.version> - <elasticsearch.version>5.2.2</elasticsearch.version> + <elasticsearch.version>7.17.13</elasticsearch.version> <calcite.version>1.16.0</calcite.version> <jedis.version>2.9.0</jedis.version> <activemq.version>5.18.3</activemq.version>