This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 8e8c65c66 [STORM-3680] Upgrade Jedis Library (#3314) 8e8c65c66 is described below commit 8e8c65c666814685970d8791e2a9ba3a46ae1129 Author: Steviep <c...@users.noreply.github.com> AuthorDate: Mon Dec 4 18:20:09 2023 +0900 [STORM-3680] Upgrade Jedis Library (#3314) Upgrade Jedis Library to 5.x --------- Co-authored-by: Richard Zowalla <r...@apache.org> --- DEPENDENCY-LICENSES | 9 +- LICENSE-binary | 2 +- .../tools/Base64ToBinaryStateMigrationUtil.java | 3 +- external/storm-elasticsearch/pom.xml | 2 +- external/storm-redis/pom.xml | 13 + .../apache/storm/redis/bolt/AbstractRedisBolt.java | 19 +- .../apache/storm/redis/bolt/RedisFilterBolt.java | 16 +- .../apache/storm/redis/bolt/RedisLookupBolt.java | 6 +- .../apache/storm/redis/bolt/RedisStoreBolt.java | 6 +- .../common/adapter/RedisCommandsAdapterJedis.java | 4 +- .../adapter/RedisCommandsAdapterJedisCluster.java | 7 +- .../storm/redis/common/commands/RedisCommands.java | 5 +- .../common/container/JedisClusterContainer.java | 106 ++++- ...eContainer.java => JedisCommandsContainer.java} | 49 +- .../container/JedisCommandsContainerBuilder.java | 19 +- .../redis/common/container/JedisContainer.java | 127 ++++- .../common/container/RedisClusterContainer.java | 9 +- .../container/RedisCommandsContainerBuilder.java | 14 +- .../storm/redis/state/RedisKeyValueState.java | 2 +- .../redis/state/RedisKeyValueStateIterator.java | 4 +- .../redis/trident/state/RedisClusterMapState.java | 7 +- .../redis/trident/state/RedisClusterState.java | 9 +- .../storm/redis/bolt/RedisFilterBoltTest.java | 509 +++++++++++++++++++++ .../state/RedisKeyValueStateIteratorTest.java | 4 +- .../storm/redis/state/RedisKeyValueStateTest.java | 2 +- .../apache/storm/redis/util/JedisTestHelper.java | 81 ++++ .../org/apache/storm/redis/util/StubTuple.java | 199 ++++++++ .../apache/storm/redis/util/TupleTestHelper.java | 45 ++ .../redis/util/outputcollector/EmittedTuple.java | 56 +++ .../util/outputcollector/StubOutputCollector.java | 94 ++++ integration-test/pom.xml | 1 - pom.xml | 11 +- .../storm/sql/redis/RedisDataSourcesProvider.java | 3 +- 33 files changed, 1311 insertions(+), 132 deletions(-) diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES index 57fb93e51..ffea76d71 100644 --- a/DEPENDENCY-LICENSES +++ b/DEPENDENCY-LICENSES @@ -15,6 +15,7 @@ List of third-party dependencies grouped by their license type. * Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org) * Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5 - https://commons.apache.org/proper/commons-fileupload/) * Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 - https://commons.apache.org/proper/commons-lang/) + * Apache Commons Pool (org.apache.commons:commons-pool2:2.12.0 - https://commons.apache.org/proper/commons-pool/) * Apache Commons Text (org.apache.commons:commons-text:1.11.0 - https://commons.apache.org/proper/commons-text) * Apache Directory API ASN.1 API (org.apache.directory.api:api-asn1-api:2.1.4 - https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-api/) * Apache Directory API ASN.1 BER (org.apache.directory.api:api-asn1-ber:2.1.4 - https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-ber/) @@ -26,8 +27,7 @@ List of third-party dependencies grouped by their license type. * Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - https://logging.apache.org/log4j/2.x/log4j/log4j-core/) * Apache Log4j SLF4J Binding (org.apache.logging.log4j:log4j-slf4j-impl:2.21.1 - https://logging.apache.org/log4j/2.x/log4j/log4j-slf4j-impl/) * Apache Log4j Web (org.apache.logging.log4j:log4j-web:2.21.1 - https://logging.apache.org/log4j/2.x/log4j/log4j-web/) - * Gson (com.google.code.gson:gson:2.8.9 - https://github.com/google/gson/gson) - * Gson (com.google.code.gson:gson:2.9.0 - https://github.com/google/gson/gson) + * Gson (com.google.code.gson:gson:2.10.1 - https://github.com/google/gson/gson) * Maven Plugin Tools Java Annotations (org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 - https://maven.apache.org/plugin-tools/maven-plugin-annotations) * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - https://github.com/xerial/snappy-java) @@ -73,7 +73,6 @@ List of third-party dependencies grouped by their license type. * Apache Commons Logging (commons-logging:commons-logging:1.2 - http://commons.apache.org/proper/commons-logging/) * Apache Commons Math (org.apache.commons:commons-math3:3.6.1 - http://commons.apache.org/proper/commons-math/) * Apache Commons Net (commons-net:commons-net:3.9.0 - https://commons.apache.org/proper/commons-net/) - * Apache Commons Pool (org.apache.commons:commons-pool2:2.4.2 - http://commons.apache.org/proper/commons-pool/) * Apache Curator (org.apache.curator:apache-curator:2.12.0 - http://curator.apache.org) * Apache Derby Database Engine and Embedded JDBC Driver (org.apache.derby:derby:10.14.1.0 - http://db.apache.org/derby/) * Apache Geronimo JCache Spec 1.0 (org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1 - http://geronimo.apache.org/maven/specs/geronimo-jcache_1.0_spec/1.0-alpha-1) @@ -215,7 +214,6 @@ List of third-party dependencies grouped by their license type. * Google Guice - Extensions - AssistedInject (com.google.inject.extensions:guice-assistedinject:3.0 - http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/) * Google Guice - Extensions - Servlet (com.google.inject.extensions:guice-servlet:4.0 - https://github.com/google/guice/extensions-parent/guice-servlet) * Graphite Integration for Metrics (io.dropwizard.metrics:metrics-graphite:3.2.6 - http://metrics.dropwizard.io/metrics-graphite/) - * Gson (com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/) * Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1 - http://code.google.com/p/guava-libraries/guava) * Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava) * Guava: Google Core Libraries for Java (com.google.guava:guava:32.1.3-jre - https://github.com/google/guava) @@ -659,7 +657,7 @@ List of third-party dependencies grouped by their license type. * argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 - http://argparse4j.github.io) * Checker Qual (org.checkerframework:checker-qual:3.37.0 - https://checkerframework.org/) * JCodings (org.jruby.jcodings:jcodings:1.0.55 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings) - * Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis) + * Jedis (redis.clients:jedis:5.1.0 - https://github.com/redis/jedis) * Joni (org.jruby.joni:joni:2.1.31 - http://nexus.sonatype.org/oss-repository-hosting.html/joni) * JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple) * JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 - http://www.slf4j.org) @@ -682,6 +680,7 @@ List of third-party dependencies grouped by their license type. Public Domain * AOP alliance (aopalliance:aopalliance:1.0 - http://aopalliance.sourceforge.net) + * JSON in Java (org.json:json:20231013 - https://github.com/douglascrockford/JSON-java) Revised BSD diff --git a/LICENSE-binary b/LICENSE-binary index a6c05f4c8..3d10d5dac 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -656,7 +656,6 @@ The license texts of these dependencies can be found in the licenses directory. * Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5 - https://commons.apache.org/proper/commons-fileupload/) * Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 - https://commons.apache.org/proper/commons-lang/) * Apache Commons Text (org.apache.commons:commons-text:1.11.0 - https://commons.apache.org/proper/commons-text) - * Gson (com.google.code.gson:gson:2.9.0 - https://github.com/google/gson/gson) * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - https://github.com/xerial/snappy-java) Apache License @@ -808,6 +807,7 @@ The license texts of these dependencies can be found in the licenses directory. * Google Guice - Extensions - AssistedInject (com.google.inject.extensions:guice-assistedinject:3.0 - http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/) * Google Guice - Extensions - Servlet (com.google.inject.extensions:guice-servlet:4.0 - https://github.com/google/guice/extensions-parent/guice-servlet) * Graphite Integration for Metrics (io.dropwizard.metrics:metrics-graphite:3.2.6 - http://metrics.dropwizard.io/metrics-graphite/) + * Gson (com.google.code.gson:gson:2.10.1 - https://github.com/google/gson/gson) * Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1 - http://code.google.com/p/guava-libraries/guava) * Guava: Google Core Libraries for Java (com.google.guava:guava:32.1.3-jre - https://github.com/google/guava) * Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.1 - https://github.com/google/guava/failureaccess) diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java index 9f060b806..9d0d4c113 100644 --- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java +++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java @@ -35,8 +35,7 @@ import org.apache.storm.redis.common.container.RedisCommandsContainerBuilder; import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import redis.clients.util.SafeEncoder; +import redis.clients.jedis.util.SafeEncoder; public class Base64ToBinaryStateMigrationUtil { private static final Logger LOG = LoggerFactory.getLogger(Base64ToBinaryStateMigrationUtil.class); diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml index ba68ee7b8..df31db80b 100644 --- a/external/storm-elasticsearch/pom.xml +++ b/external/storm-elasticsearch/pom.xml @@ -125,7 +125,7 @@ <dependency> <groupId>org.testcontainers</groupId> <artifactId>elasticsearch</artifactId> - <version>1.19.1</version> + <version>${testcontainers.version}</version> <scope>test</scope> </dependency> </dependencies> diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml index b93d3d951..1cb7b6e06 100644 --- a/external/storm-redis/pom.xml +++ b/external/storm-redis/pom.xml @@ -78,6 +78,19 @@ <groupId>org.hamcrest</groupId> <artifactId>hamcrest</artifactId> </dependency> + <!-- Test Containers --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java index fd49f9180..0a7d293d1 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java @@ -15,12 +15,11 @@ package org.apache.storm.redis.bolt; import java.util.Map; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.container.JedisCommandsContainer; import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder; -import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import redis.clients.jedis.JedisCommands; /** * AbstractRedisBolt class is for users to implement custom bolts which makes interaction with Redis. @@ -45,7 +44,7 @@ import redis.clients.jedis.JedisCommands; public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt { protected OutputCollector collector; - private transient JedisCommandsInstanceContainer container; + private transient JedisCommandsContainer container; private JedisPoolConfig jedisPoolConfig; private JedisClusterConfig jedisClusterConfig; @@ -89,18 +88,10 @@ public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt { * Borrow JedisCommands instance from container.<p/> * JedisCommands is an interface which contains single key operations. * @return implementation of JedisCommands - * @see JedisCommandsInstanceContainer#getInstance() + * @see JedisCommandsContainer */ - protected JedisCommands getInstance() { - return this.container.getInstance(); - } - - /** - * Return borrowed instance to container. - * @param instance borrowed object - */ - protected void returnInstance(JedisCommands instance) { - this.container.returnInstance(instance); + protected JedisCommandsContainer getInstance() { + return this.container; } @Override diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java index a3f0ace8a..d9228d32d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java @@ -13,14 +13,16 @@ package org.apache.storm.redis.bolt; import java.util.List; +import java.util.Objects; + import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.container.JedisCommandsContainer; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisFilterMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import redis.clients.jedis.GeoCoordinate; -import redis.clients.jedis.JedisCommands; /** * Basic bolt for querying from Redis and filters out if key/field doesn't exist. @@ -85,7 +87,7 @@ public class RedisFilterBolt extends AbstractRedisBolt { String key = filterMapper.getKeyFromTuple(input); boolean found; - JedisCommands jedisCommand = null; + JedisCommandsContainer jedisCommand = null; try { jedisCommand = getInstance(); @@ -112,7 +114,13 @@ public class RedisFilterBolt extends AbstractRedisBolt { case GEO: List<GeoCoordinate> geopos = jedisCommand.geopos(additionalKey, key); - found = (geopos != null && geopos.size() > 0); + if (geopos == null || geopos.isEmpty()) { + found = false; + } else { + // If any entry is NOT null, then we have a match. + found = geopos.stream() + .anyMatch(Objects::nonNull); + } break; default: @@ -127,8 +135,6 @@ public class RedisFilterBolt extends AbstractRedisBolt { } catch (Exception e) { this.collector.reportError(e); this.collector.fail(input); - } finally { - returnInstance(jedisCommand); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java index 4d903402a..07c79773c 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -15,12 +15,12 @@ package org.apache.storm.redis.bolt; import java.util.List; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.container.JedisCommandsContainer; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import redis.clients.jedis.JedisCommands; /** * Basic bolt for querying from Redis and emits response as tuple. @@ -70,7 +70,7 @@ public class RedisLookupBolt extends AbstractRedisBolt { String key = lookupMapper.getKeyFromTuple(input); Object lookupValue; - JedisCommands jedisCommand = null; + JedisCommandsContainer jedisCommand = null; try { jedisCommand = getInstance(); @@ -116,8 +116,6 @@ public class RedisLookupBolt extends AbstractRedisBolt { } catch (Exception e) { this.collector.reportError(e); this.collector.fail(input); - } finally { - returnInstance(jedisCommand); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index ce08a735d..065d9cbea 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -14,11 +14,11 @@ package org.apache.storm.redis.bolt; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.container.JedisCommandsContainer; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; -import redis.clients.jedis.JedisCommands; /** * Basic bolt for writing to Redis. @@ -66,7 +66,7 @@ public class RedisStoreBolt extends AbstractRedisBolt { String key = storeMapper.getKeyFromTuple(input); String value = storeMapper.getValueFromTuple(input); - JedisCommands jedisCommand = null; + JedisCommandsContainer jedisCommand = null; try { jedisCommand = getInstance(); @@ -114,8 +114,6 @@ public class RedisStoreBolt extends AbstractRedisBolt { } catch (Exception e) { this.collector.reportError(e); this.collector.fail(input); - } finally { - returnInstance(jedisCommand); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java index d791bd865..a3f1f5a6b 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java @@ -23,8 +23,8 @@ import java.io.IOException; import java.util.Map; import org.apache.storm.redis.common.commands.RedisCommands; import redis.clients.jedis.Jedis; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; /** * Adapter class to make Jedis instance play with BinaryRedisCommands interface. diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java index 15f5e0a88..9d2d96b19 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java @@ -23,14 +23,15 @@ import java.io.IOException; import java.util.Map; import org.apache.storm.redis.common.commands.RedisCommands; import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; + /** * Adapter class to make JedisCluster instance play with BinaryRedisCommands interface. */ public class RedisCommandsAdapterJedisCluster implements RedisCommands, Closeable { - private JedisCluster jedisCluster; + private final JedisCluster jedisCluster; public RedisCommandsAdapterJedisCluster(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java index 58835b1ab..5a7170eb2 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java @@ -19,8 +19,9 @@ package org.apache.storm.redis.common.commands; import java.util.Map; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; + +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; /** * This interface represents Jedis methods exhaustively which are used on storm-redis. diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java index ca2a2f44e..5220c1c7a 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java @@ -12,16 +12,16 @@ package org.apache.storm.redis.common.container; -import java.io.IOException; +import java.util.List; +import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisCommands; /** * Container for managing JedisCluster. * <p/> * Note that JedisCluster doesn't need to be pooled since it's thread-safe and it stores pools internally. */ -public class JedisClusterContainer implements JedisCommandsInstanceContainer { +public class JedisClusterContainer implements JedisCommandsContainer { private JedisCluster jedisCluster; @@ -33,20 +33,94 @@ public class JedisClusterContainer implements JedisCommandsInstanceContainer { this.jedisCluster = jedisCluster; } - /** - * {@inheritDoc} - */ @Override - public JedisCommands getInstance() { - return this.jedisCluster; + public Boolean exists(final String key) { + return jedisCluster.exists(key); + } + + @Override + public String get(final String key) { + return jedisCluster.get(key); + } + + @Override + public String hget(final String key, final String field) { + return jedisCluster.hget(key, field); + } + + @Override + public Long geoadd(final String key, final double longitude, final double latitude, final String member) { + return jedisCluster.geoadd(key, longitude, latitude, member); + } + + @Override + public List<GeoCoordinate> geopos(final String key, final String... members) { + return jedisCluster.geopos(key, members); + } + + @Override + public Boolean hexists(final String key, final String field) { + return jedisCluster.hexists(key, field); + } + + @Override + public Long hset(final String key, final String field, final String value) { + return jedisCluster.hset(key, field, value); + } + + @Override + public String lpop(final String key) { + return jedisCluster.lpop(key); + } + + @Override + public Long pfadd(final String key, final String... elements) { + return jedisCluster.pfadd(key, elements); + } + + @Override + public long pfcount(final String key) { + return jedisCluster.pfcount(key); + } + + @Override + public Long rpush(final String key, final String... string) { + return jedisCluster.rpush(key, string); + } + + @Override + public Long sadd(final String key, final String... member) { + return jedisCluster.sadd(key, member); + } + + @Override + public Long scard(final String key) { + return jedisCluster.scard(key); + } + + @Override + public String set(final String key, final String value) { + return jedisCluster.set(key, value); + } + + @Override + public Boolean sismember(final String key, final String member) { + return jedisCluster.sismember(key, member); + } + + @Override + public Long zadd(final String key, final double score, final String member) { + return jedisCluster.zadd(key, score, member); + } + + @Override + public Long zrank(final String key, final String member) { + return jedisCluster.zrank(key, member); } - /** - * {@inheritDoc} - */ @Override - public void returnInstance(JedisCommands jedisCommands) { - // do nothing + public Double zscore(final String key, final String member) { + return jedisCluster.zscore(key, member); } /** @@ -54,10 +128,6 @@ public class JedisClusterContainer implements JedisCommandsInstanceContainer { */ @Override public void close() { - try { - this.jedisCluster.close(); - } catch (IOException e) { - e.printStackTrace(); - } + this.jedisCluster.close(); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java similarity index 51% rename from external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java rename to external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java index 49a3373fe..38b1321b7 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java @@ -13,23 +13,48 @@ package org.apache.storm.redis.common.container; import java.io.Closeable; -import redis.clients.jedis.JedisCommands; +import java.util.List; +import redis.clients.jedis.GeoCoordinate; /** * Interfaces for containers which stores instances implementing JedisCommands. */ -public interface JedisCommandsInstanceContainer extends Closeable { - /** - * Borrows instance from container. - * @return instance which implements JedisCommands - */ - JedisCommands getInstance(); +public interface JedisCommandsContainer extends Closeable { + Boolean exists(String key); - /** - * Returns instance to container. - * @param jedisCommands borrowed instance - */ - void returnInstance(JedisCommands jedisCommands); + String get(String key); + + String hget(String key, String field); + + Long geoadd(String key, double longitude, double latitude, String member); + + List<GeoCoordinate> geopos(String key, String... members); + + Boolean hexists(String key, String field); + + Long hset(String key, String field, String value); + + String lpop(String key); + + Long pfadd(String key, String... elements); + + long pfcount(String key); + + Long rpush(String key, String... string); + + Long sadd(String key, String... member); + + Long scard(String key); + + String set(String key, String value); + + Boolean sismember(String key, String member); + + Long zadd(String key, double score, String member); + + Long zrank(String key, String member); + + Double zscore(String key, String member); /** * Release Container. diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java index 86f0f0912..9370e3421 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java @@ -12,8 +12,10 @@ package org.apache.storm.redis.common.container; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import redis.clients.jedis.Connection; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; @@ -22,18 +24,17 @@ import redis.clients.jedis.JedisPool; */ public class JedisCommandsContainerBuilder { - // FIXME: We're using default config since it cannot be serialized - // We still needs to provide some options externally - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); - /** * Builds container for single Redis environment. * @param config configuration for JedisPool * @return container for single Redis environment */ - public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { + public static JedisCommandsContainer build(JedisPoolConfig config) { + // FIXME: We're using default config since it cannot be serialized + // We still needs to provide some options externally JedisPool jedisPool = - new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), + new JedisPool(new redis.clients.jedis.JedisPoolConfig(), config.getHost(), config.getPort(), + config.getTimeout(), config.getPassword(), config.getDatabase()); return new JedisContainer(jedisPool); } @@ -43,10 +44,12 @@ public class JedisCommandsContainerBuilder { * @param config configuration for JedisCluster * @return container for Redis Cluster environment */ - public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { + public static JedisCommandsContainer build(JedisClusterConfig config) { + // FIXME: We're using default config since it cannot be serialized + // We still needs to provide some options externally JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), - DEFAULT_POOL_CONFIG); + new GenericObjectPoolConfig<>()); return new JedisClusterContainer(jedisCluster); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java index 1ebb5508a..1eed610dc 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java @@ -14,15 +14,19 @@ package org.apache.storm.redis.common.container; import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.function.Function; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.JedisCommands; +import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.commands.JedisCommands; /** - * Container for managing Jedis instances. + * Adapter for providing a unified interface for running commands over both Jedis and JedisCluster instances. */ -public class JedisContainer implements JedisCommandsInstanceContainer { +public class JedisContainer implements JedisCommandsContainer { private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class); private JedisPool jedisPool; @@ -35,35 +39,114 @@ public class JedisContainer implements JedisCommandsInstanceContainer { this.jedisPool = jedisPool; } + private <T> T runCommand(Function<JedisCommands, T> command) { + final JedisCommands jedisCommands = jedisPool.getResource(); + try { + return command.apply(jedisCommands); + } finally { + try { + ((Closeable) jedisCommands).close(); + } catch (IOException e) { + LOG.error("Failed to close (return) instance to pool"); + } + } + } + /** * {@inheritDoc} */ @Override - public JedisCommands getInstance() { - return jedisPool.getResource(); + public void close() { + jedisPool.close(); } - /** - * {@inheritDoc} - */ @Override - public void returnInstance(JedisCommands jedisCommands) { - if (jedisCommands == null) { - return; - } + public Boolean exists(final String key) { + return runCommand((jedisCommands) -> jedisCommands.exists(key)); + } - try { - ((Closeable) jedisCommands).close(); - } catch (IOException e) { - LOG.error("Failed to close (return) instance to pool"); - } + @Override + public String get(final String key) { + return runCommand((jedisCommands) -> jedisCommands.get(key)); } - /** - * {@inheritDoc} - */ @Override - public void close() { - jedisPool.close(); + public String hget(final String key, final String field) { + return runCommand((jedisCommands) -> jedisCommands.hget(key, field)); + } + + @Override + public Long geoadd(final String key, final double longitude, final double latitude, final String member) { + return runCommand((jedisCommands) -> jedisCommands.geoadd(key, longitude, latitude, member)); + } + + @Override + public List<GeoCoordinate> geopos(final String key, final String... members) { + return runCommand((jedisCommands) -> jedisCommands.geopos(key, members)); + } + + @Override + public Boolean hexists(final String key, final String field) { + return runCommand((jedisCommands) -> jedisCommands.hexists(key, field)); + } + + @Override + public Long hset(final String key, final String field, final String value) { + return runCommand((jedisCommands) -> jedisCommands.hset(key, field, value)); + } + + @Override + public String lpop(final String key) { + return runCommand((jedisCommands) -> jedisCommands.lpop(key)); + } + + @Override + public Long pfadd(final String key, final String... elements) { + return runCommand((jedisCommands) -> jedisCommands.pfadd(key, elements)); + } + + @Override + public long pfcount(final String key) { + return runCommand((jedisCommands) -> jedisCommands.pfcount(key)); + } + + @Override + public Long rpush(final String key, final String... string) { + return runCommand((jedisCommands) -> jedisCommands.rpush(key, string)); + } + + @Override + public Long sadd(final String key, final String... member) { + return runCommand((jedisCommands) -> jedisCommands.sadd(key, member)); + } + + @Override + public Long scard(final String key) { + return runCommand((jedisCommands) -> jedisCommands.scard(key)); + } + + @Override + public String set(final String key, final String value) { + return runCommand((jedisCommands) -> jedisCommands.set(key, value)); + } + + @Override + public Boolean sismember(final String key, final String member) { + return runCommand((jedisCommands) -> jedisCommands.sismember(key, member)); + } + + @Override + public Long zadd(final String key, final double score, final String member) { + return runCommand((jedisCommands) -> jedisCommands.zadd(key, score, member)); + } + + @Override + public Long zrank(final String key, final String member) { + return runCommand((jedisCommands) -> jedisCommands.zrank(key, member)); + } + + @Override + public Double zscore(final String key, final String member) { + return runCommand((jedisCommands) -> jedisCommands.zscore(key, member)); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java index 04c05b876..600d4fec6 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java @@ -18,7 +18,6 @@ package org.apache.storm.redis.common.container; -import java.io.IOException; import org.apache.storm.redis.common.adapter.RedisCommandsAdapterJedisCluster; import org.apache.storm.redis.common.commands.RedisCommands; import redis.clients.jedis.JedisCluster; @@ -60,11 +59,7 @@ public class RedisClusterContainer implements RedisCommandsInstanceContainer { * {@inheritDoc} */ @Override - public void close() throws IOException { - try { - this.jedisCluster.close(); - } catch (IOException e) { - e.printStackTrace(); - } + public void close() { + this.jedisCluster.close(); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java index 81201efd9..b369e1ca0 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java @@ -18,6 +18,7 @@ package org.apache.storm.redis.common.container; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; import redis.clients.jedis.JedisCluster; @@ -28,10 +29,6 @@ import redis.clients.jedis.JedisPool; */ public class RedisCommandsContainerBuilder { - // FIXME: We're using default config since it cannot be serialized - // We still needs to provide some options externally - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); - /** * Builds container for single Redis environment. * @@ -39,8 +36,11 @@ public class RedisCommandsContainerBuilder { * @return container for single Redis environment */ public static RedisCommandsInstanceContainer build(JedisPoolConfig config) { + // FIXME: We're using default config since it cannot be serialized + // We still needs to provide some options externally JedisPool jedisPool = - new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), + new JedisPool(new redis.clients.jedis.JedisPoolConfig(), config.getHost(), config.getPort(), + config.getTimeout(), config.getPassword(), config.getDatabase()); return new RedisContainer(jedisPool); } @@ -52,8 +52,10 @@ public class RedisCommandsContainerBuilder { * @return container for Redis Cluster environment */ public static RedisCommandsInstanceContainer build(JedisClusterConfig config) { + // FIXME: We're using default config since it cannot be serialized + // We still needs to provide some options externally JedisCluster jedisCluster = - new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG); + new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), new GenericObjectPoolConfig<>()); return new RedisClusterContainer(jedisCluster); } } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java index 2a11d962a..880aa5373 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java @@ -35,7 +35,7 @@ import org.apache.storm.state.KeyValueState; import org.apache.storm.state.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.util.SafeEncoder; +import redis.clients.jedis.util.SafeEncoder; /** * A redis based implementation that persists the state in Redis. diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java index 19d2b9194..e6b282714 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java @@ -22,8 +22,8 @@ import org.apache.storm.state.BaseBinaryStateIterator; import org.apache.storm.state.DefaultStateEncoder; import org.apache.storm.state.Serializer; import org.apache.storm.state.StateEncoder; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; /** * An iterator over {@link RedisKeyValueState}. diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java index dddee1518..ae634d686 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java @@ -15,6 +15,8 @@ package org.apache.storm.redis.trident.state; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.task.IMetricsContext; @@ -31,6 +33,7 @@ import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.trident.state.map.TransactionalMap; import org.apache.storm.tuple.Values; +import redis.clients.jedis.Connection; import redis.clients.jedis.JedisCluster; /** @@ -277,7 +280,7 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { * RedisClusterMapState.Factory provides Redis Cluster environment version of StateFactory. */ protected static class Factory implements StateFactory { - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + public static final GenericObjectPoolConfig<Connection> DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>(); JedisClusterConfig jedisClusterConfig; @@ -316,7 +319,7 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { */ @Override public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), + final JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getTimeout(), jedisClusterConfig.getTimeout(), jedisClusterConfig.getMaxRedirections(), diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java index c7f018995..e052dd1b6 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java @@ -13,10 +13,13 @@ package org.apache.storm.redis.trident.state; import java.util.Map; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; +import redis.clients.jedis.Connection; import redis.clients.jedis.JedisCluster; /** @@ -74,9 +77,9 @@ public class RedisClusterState implements State { * @see StateFactory */ public static class Factory implements StateFactory { - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + public static final GenericObjectPoolConfig<Connection> DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>(); - private JedisClusterConfig jedisClusterConfig; + private final JedisClusterConfig jedisClusterConfig; /** * Constructor. @@ -92,7 +95,7 @@ public class RedisClusterState implements State { */ @Override public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), + final JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getTimeout(), jedisClusterConfig.getTimeout(), jedisClusterConfig.getMaxRedirections(), diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java new file mode 100644 index 000000000..dc80361a6 --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.redis.bolt; + +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisFilterMapper; +import org.apache.storm.redis.util.JedisTestHelper; +import org.apache.storm.redis.util.StubTuple; +import org.apache.storm.redis.util.TupleTestHelper; +import org.apache.storm.redis.util.outputcollector.EmittedTuple; +import org.apache.storm.redis.util.outputcollector.StubOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.GEO; +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HASH; +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HYPER_LOG_LOG; +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SET; +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SORTED_SET; +import static org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.STRING; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +@Testcontainers +class RedisFilterBoltTest { + + @Container + public GenericContainer container = new GenericContainer("redis:7.2.3-alpine") + .withExposedPorts(6379); + + private JedisTestHelper jedisHelper; + + private JedisPoolConfig.Builder configBuilder; + private StubOutputCollector outputCollector; + private TopologyContext topologyContext; + + @BeforeEach + void setup() { + configBuilder = new JedisPoolConfig.Builder(); + configBuilder + .setHost(container.getHost()) + .setPort(container.getFirstMappedPort()) + .setTimeout(10); + + outputCollector = new StubOutputCollector(); + topologyContext = mock(TopologyContext.class); + jedisHelper = new JedisTestHelper(container); + } + + @AfterEach + void cleanup() { + verifyNoMoreInteractions(topologyContext); + jedisHelper.close(); + } + + /** + * Smoke test the exists check when the key is NOT found. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_exists_keyNotFound() { + // Define input key + final String inputKey = "ThisIsMyKey"; + + // Ensure key does not exist in redis + jedisHelper.delete(inputKey); + assertFalse(jedisHelper.exists(inputKey), "Sanity check key should not exist"); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(STRING); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the exists check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_exists_keyFound() { + // Define input key + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.set(inputKey, "some-value"); + assertTrue(jedisHelper.exists(inputKey), "Sanity check key exists."); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(STRING); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Smoke test the sismember check when the key is NOT found in the set. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_sismember_notMember() { + // Define input key + final String setKey = "ThisIsMySet"; + final String inputKey = "ThisIsMyKey"; + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(SET, setKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the exists check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_sismember_isMember() { + // Define input key + final String setKey = "ThisIsMySet"; + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.smember(setKey, inputKey); + assertTrue(jedisHelper.sismember(setKey, inputKey), "Sanity check, should be a member"); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(SET, setKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Smoke test the hexists check when the key is NOT found in the set. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_hexists_notMember() { + // Define input key + final String hashKey = "ThisIsMyHash"; + final String inputKey = "ThisIsMyKey"; + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(HASH, hashKey); + + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the hexists check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_hexists_isMember() { + // Define input key + final String hashKey = "ThisIsMyHash"; + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.hset(hashKey, inputKey, "value"); + assertTrue(jedisHelper.hexists(hashKey, inputKey), "Sanity check, should be a member"); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(HASH, hashKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Smoke test the zrank check when the key is NOT found in the set. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_zrank_notMember() { + // Define input key + final String setKey = "ThisIsMySetKey"; + final String inputKey = "ThisIsMyKey"; + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(SORTED_SET, setKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the zrank check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_zrank_isMember() { + // Define input key + final String setKey = "ThisIsMySetKey"; + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.zrank(setKey, 2, inputKey); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(SORTED_SET, setKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Smoke test the pfcount check when the key is NOT found in the set. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_pfcount_notMember() { + // Define input key + final String inputKey = "ThisIsMyKey"; + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(HYPER_LOG_LOG); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the pfcount check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_pfcount_isMember() { + // Define input key + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.pfadd(inputKey, "my value"); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(HYPER_LOG_LOG); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Smoke test the geopos check when the key is NOT found in the set. + * Expectation is tuple is acked, and nothing is emitted. + */ + @Test + void smokeTest_geopos_notMember() { + // Define input key + final String geoKey = "ThisIsMyGeoKey"; + final String inputKey = "ThisIsMyKey"; + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(GEO, geoKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify the bolt filtered the input tuple. + verifyTupleFiltered(); + } + + /** + * Smoke test the geopos check when the key IS found. + * Expectation is tuple is acked, and tuple is emitted. + */ + @Test + void smokeTest_geopos_isMember() { + // Define input key + final String geoKey = "ThisIsMyGeoKey"; + final String inputKey = "ThisIsMyKey"; + + // Ensure key does exist in redis + jedisHelper.geoadd(geoKey, 139.731992, 35.709026, inputKey); + + // Create an input tuple + final Map<String, Object> values = new HashMap<>(); + values.put("key", inputKey); + values.put("value", "ThisIsMyValue"); + final Tuple tuple = new StubTuple(values); + + final JedisPoolConfig config = configBuilder.build(); + final TestMapper mapper = new TestMapper(GEO, geoKey); + + final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper); + bolt.prepare(new HashMap<>(), topologyContext, new OutputCollector(outputCollector)); + bolt.process(tuple); + + // Verify Tuple passed through the bolt + verifyTuplePassed(tuple); + } + + /** + * Utility method to help verify that a tuple passed throught hte RedisFilterBolt properly. + * @param expectedTuple The tuple we expected to pass through the bolt. + */ + private void verifyTuplePassed(final Tuple expectedTuple) { + // Verify no errors or failed tuples + assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have no reported errors"); + assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have no failed tuples"); + + // We should have a single acked tuple + assertEquals(1, outputCollector.getAckedTuples().size(), "Should have a single acked tuple"); + + // We should have a single emitted tuple. + assertEquals(1, outputCollector.getEmittedTuples().size(), "Should have a single emitted tuple"); + + // Verify the tuple is what we expected + final EmittedTuple emittedTuple = outputCollector.getEmittedTuples().get(0); + assertEquals("default", emittedTuple.getStreamId()); + TupleTestHelper.verifyAnchors(emittedTuple, expectedTuple); + TupleTestHelper.verifyEmittedTuple(emittedTuple, expectedTuple.getValues()); + } + + /** + * Utility method to help verify that no tuples were passed through the RedisFilterBolt. + */ + private void verifyTupleFiltered() { + // Verify no errors or failed tuples + assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have no reported errors"); + assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have no failed tuples"); + + // We should have a single acked tuple + assertEquals(1, outputCollector.getAckedTuples().size(), "Should have a single acked tuple"); + + // We should have no emitted tuple. + assertTrue(outputCollector.getEmittedTuples().isEmpty(), "Should have no emitted tuples"); + } + + /** + * Test Implementation. + */ + private static class TestMapper implements RedisFilterMapper { + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + private TestMapper(final RedisDataTypeDescription.RedisDataType dataType) { + this(dataType, null); + } + + private TestMapper(final RedisDataTypeDescription.RedisDataType dataType, final String additionalKey) { + this.dataType = dataType; + this.additionalKey = additionalKey; + } + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("key", "value")); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return new RedisDataTypeDescription(dataType, additionalKey); + } + + @Override + public String getKeyFromTuple(final ITuple tuple) { + return tuple.getStringByField("key"); + } + + @Override + public String getValueFromTuple(final ITuple tuple) { + return tuple.getStringByField("value"); + } + } +} \ No newline at end of file diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java index 212504e65..00f4a0eb4 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java @@ -24,8 +24,8 @@ import org.apache.storm.state.DefaultStateSerializer; import org.apache.storm.state.Serializer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java index e6ee68887..f82ef6fa4 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java @@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.stubbing.Answer; -import redis.clients.util.SafeEncoder; +import redis.clients.jedis.util.SafeEncoder; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java new file mode 100644 index 000000000..9c357493f --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.redis.util; + +import java.util.Objects; +import org.testcontainers.containers.GenericContainer; +import redis.clients.jedis.Jedis; + +/** + * Utility class for helping interact with a Redis service in tests. + */ +public class JedisTestHelper { + private final Jedis jedis; + + /** + * Constructor. + * @param container Container instance to create a redis client against. + */ + public JedisTestHelper(final GenericContainer container) { + Objects.requireNonNull(container); + + jedis = new Jedis( + container.getHost(), + container.getFirstMappedPort() + ); + } + + public void delete(final String key) { + jedis.del(key); + } + + public void geoadd(final String key, final double longitude, final double latitude, final String value) { + jedis.geoadd(key, longitude, latitude, value); + } + + public boolean hexists(final String hash, final String key) { + return jedis.hexists(hash, key); + } + + public void hset(final String hash, final String key, final String value) { + jedis.hset(hash, key, value); + } + + public boolean exists(final String key) { + return jedis.exists(key); + } + + public void pfadd(final String key, final String value) { + jedis.pfadd(key, value); + } + + public void set(final String key, final String value) { + jedis.set(key, value); + } + + public void smember(final String set, final String value) { + jedis.sadd(set, value); + } + + public boolean sismember(final String set, final String value) { + return jedis.sismember(set, value); + } + + public void zrank(final String set, final double score, final String value) { + jedis.zadd(set, score, value); + } + + public void close() { + jedis.close(); + } +} diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java new file mode 100644 index 000000000..ec1e3e942 --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.redis.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.shade.org.apache.commons.lang.NotImplementedException; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; + + +/** + * Partial Implementation of the Tuple interface for tests. + */ +public class StubTuple implements Tuple { + + final Map<String, Object> values; + + public StubTuple(final Map<String, Object> values) { + this.values = Collections.unmodifiableMap(new HashMap<>(Objects.requireNonNull(values))); + } + + @Override + public int size() { + return values.size(); + } + + @Override + public boolean contains(final String field) { + return values.containsKey(field); + } + + @Override + public Fields getFields() { + return new Fields(values.keySet().toArray(new String[0])); + } + + @Override + public int fieldIndex(final String field) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public List<Object> select(final Fields selector) { + return null; + } + + @Override + public Object getValue(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public String getString(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Integer getInteger(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Long getLong(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Boolean getBoolean(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Short getShort(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Byte getByte(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Double getDouble(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Float getFloat(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public byte[] getBinary(final int i) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Object getValueByField(final String field) { + return values.get(field); + } + + @Override + public String getStringByField(final String field) { + return values.get(field).toString(); + } + + @Override + public Integer getIntegerByField(final String field) { + return (Integer) values.get(field); + } + + @Override + public Long getLongByField(final String field) { + return (Long) values.get(field); + } + + @Override + public Boolean getBooleanByField(final String field) { + return (Boolean) values.get(field); + } + + @Override + public Short getShortByField(final String field) { + return (Short) values.get(field); + } + + @Override + public Byte getByteByField(final String field) { + return (Byte) values.get(field); + } + + @Override + public Double getDoubleByField(final String field) { + return (Double) values.get(field); + } + + @Override + public Float getFloatByField(final String field) { + return (Float) values.get(field); + } + + @Override + public byte[] getBinaryByField(final String field) { + return (byte[]) values.get(field); + } + + @Override + public List<Object> getValues() { + return new ArrayList<>(values.values()); + } + + @Override + public GlobalStreamId getSourceGlobalStreamId() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public String getSourceComponent() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public int getSourceTask() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public String getSourceStreamId() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public MessageId getMessageId() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public GeneralTopologyContext getContext() { + throw new NotImplementedException("Not implemented"); + } +} diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java new file mode 100644 index 000000000..0580ec18c --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.redis.util; + +import java.util.List; +import java.util.Objects; +import org.apache.storm.redis.util.outputcollector.EmittedTuple; +import org.apache.storm.tuple.Tuple; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Utility for common test validations. + */ +public class TupleTestHelper { + + public static void verifyAnchors(final EmittedTuple emittedTuple, final Tuple expectedAnchor) { + Objects.requireNonNull(emittedTuple); + Objects.requireNonNull(expectedAnchor); + assertEquals(1, emittedTuple.getAnchors().size(), "Should have a single anchor"); + + final Tuple anchor = emittedTuple.getAnchors().get(0); + assertNotNull(anchor, "Should be non-null"); + assertEquals(expectedAnchor, anchor); + } + + public static void verifyEmittedTuple(final EmittedTuple emittedTuple, final List<Object> expectedValues) { + Objects.requireNonNull(emittedTuple); + Objects.requireNonNull(expectedValues); + + assertEquals(expectedValues.size(), emittedTuple.getTuple().size()); + assertEquals(expectedValues, emittedTuple.getTuple()); + } +} diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java new file mode 100644 index 000000000..0196db92b --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.redis.util.outputcollector; + +import org.apache.storm.tuple.Tuple; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Used with StubOutputCollector for testing. + */ +public class EmittedTuple { + private final String streamId; + private final List<Object> tuple; + private final List<Tuple> anchors; + + public EmittedTuple(final String streamId, final List<Object> tuple, final Collection<Tuple> anchors) { + this.streamId = streamId; + this.tuple = tuple; + this.anchors = new ArrayList<>(anchors); + } + + public String getStreamId() { + return streamId; + } + + public List<Object> getTuple() { + return tuple; + } + + public List<Tuple> getAnchors() { + return Collections.unmodifiableList(anchors); + } + + @Override + public String toString() { + return "EmittedTuple{" + + "streamId='" + streamId + '\'' + + ", tuple=" + tuple + + ", anchors=" + anchors + + '}'; + } +} diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java new file mode 100644 index 000000000..3fec79de1 --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.redis.util.outputcollector; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.Tuple; + +/** + * Stub implementation for testing. + */ +public class StubOutputCollector implements IOutputCollector { + + final List<EmittedTuple> emittedTuples = new ArrayList<>(); + final List<Tuple> ackedTuples = new ArrayList<>(); + final List<Tuple> failedTuples = new ArrayList<>(); + final List<Throwable> reportedErrors = new ArrayList<>(); + + @Override + public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { + emittedTuples.add( + new EmittedTuple(streamId, tuple, anchors) + ); + + // Dummy value. + return Collections.singletonList(1); + } + + @Override + public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { + throw new RuntimeException("Not implemented yet!"); + } + + @Override + public void ack(final Tuple input) { + ackedTuples.add(input); + } + + @Override + public void fail(final Tuple input) { + failedTuples.add(input); + } + + @Override + public void resetTimeout(final Tuple input) { + throw new RuntimeException("Not implemented yet!"); + } + + @Override + public void flush() { + throw new RuntimeException("Not implemented yet!"); + } + + @Override + public void reportError(final Throwable error) { + reportedErrors.add(error); + } + + public List<EmittedTuple> getEmittedTuples() { + return Collections.unmodifiableList(emittedTuples); + } + + public List<Throwable> getReportedErrors() { + return Collections.unmodifiableList(reportedErrors); + } + + public List<Tuple> getFailedTuples() { + return Collections.unmodifiableList(failedTuples); + } + + public List<Tuple> getAckedTuples() { + return Collections.unmodifiableList(ackedTuples); + } + + public void reset() { + emittedTuples.clear(); + ackedTuples.clear(); + reportedErrors.clear(); + failedTuples.clear(); + } +} diff --git a/integration-test/pom.xml b/integration-test/pom.xml index f5f708b36..7bc9528cc 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -84,7 +84,6 @@ <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>2.8.9</version> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> diff --git a/pom.xml b/pom.xml index c1716ee15..5fba628ff 100644 --- a/pom.xml +++ b/pom.xml @@ -128,13 +128,13 @@ <hamcrest.version>2.2</hamcrest.version> <elasticsearch.version>7.17.13</elasticsearch.version> <calcite.version>1.16.0</calcite.version> - <jedis.version>2.9.0</jedis.version> + <jedis.version>5.1.0</jedis.version> <activemq.version>5.18.3</activemq.version> <jackson.version>2.15.2</jackson.version> <jackson.databind.version>2.15.2</jackson.databind.version> - <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version> + <testcontainers.version>1.19.1</testcontainers.version> <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile --> <java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups> @@ -157,6 +157,7 @@ <rocksdb-jni-version>8.5.4</rocksdb-jni-version> <json-smart.version>2.5.0</json-smart.version> <byte-buddy.version>1.14.9</byte-buddy.version> + <gson.version>2.10.1</gson.version> <!-- see intellij profile below... This fixes an annoyance with intellij --> <provided.scope>provided</provided.scope> @@ -1150,6 +1151,12 @@ <artifactId>byte-buddy-agent</artifactId> <version>${byte-buddy.version}</version> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson.version}</version> + </dependency> + </dependencies> </dependencyManagement> diff --git a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java index 421f84fca..b14b7f31c 100644 --- a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java +++ b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java @@ -42,8 +42,7 @@ import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; - -import redis.clients.util.JedisURIHelper; +import redis.clients.jedis.util.JedisURIHelper; /** * Create a Redis sink based on the URI and properties. The URI has the format of