This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e8c65c66 [STORM-3680] Upgrade Jedis Library (#3314)
8e8c65c66 is described below

commit 8e8c65c666814685970d8791e2a9ba3a46ae1129
Author: Steviep <c...@users.noreply.github.com>
AuthorDate: Mon Dec 4 18:20:09 2023 +0900

    [STORM-3680] Upgrade Jedis Library (#3314)
    
    Upgrade Jedis Library to 5.x
    
    
    ---------
    
    Co-authored-by: Richard Zowalla <r...@apache.org>
---
 DEPENDENCY-LICENSES                                |   9 +-
 LICENSE-binary                                     |   2 +-
 .../tools/Base64ToBinaryStateMigrationUtil.java    |   3 +-
 external/storm-elasticsearch/pom.xml               |   2 +-
 external/storm-redis/pom.xml                       |  13 +
 .../apache/storm/redis/bolt/AbstractRedisBolt.java |  19 +-
 .../apache/storm/redis/bolt/RedisFilterBolt.java   |  16 +-
 .../apache/storm/redis/bolt/RedisLookupBolt.java   |   6 +-
 .../apache/storm/redis/bolt/RedisStoreBolt.java    |   6 +-
 .../common/adapter/RedisCommandsAdapterJedis.java  |   4 +-
 .../adapter/RedisCommandsAdapterJedisCluster.java  |   7 +-
 .../storm/redis/common/commands/RedisCommands.java |   5 +-
 .../common/container/JedisClusterContainer.java    | 106 ++++-
 ...eContainer.java => JedisCommandsContainer.java} |  49 +-
 .../container/JedisCommandsContainerBuilder.java   |  19 +-
 .../redis/common/container/JedisContainer.java     | 127 ++++-
 .../common/container/RedisClusterContainer.java    |   9 +-
 .../container/RedisCommandsContainerBuilder.java   |  14 +-
 .../storm/redis/state/RedisKeyValueState.java      |   2 +-
 .../redis/state/RedisKeyValueStateIterator.java    |   4 +-
 .../redis/trident/state/RedisClusterMapState.java  |   7 +-
 .../redis/trident/state/RedisClusterState.java     |   9 +-
 .../storm/redis/bolt/RedisFilterBoltTest.java      | 509 +++++++++++++++++++++
 .../state/RedisKeyValueStateIteratorTest.java      |   4 +-
 .../storm/redis/state/RedisKeyValueStateTest.java  |   2 +-
 .../apache/storm/redis/util/JedisTestHelper.java   |  81 ++++
 .../org/apache/storm/redis/util/StubTuple.java     | 199 ++++++++
 .../apache/storm/redis/util/TupleTestHelper.java   |  45 ++
 .../redis/util/outputcollector/EmittedTuple.java   |  56 +++
 .../util/outputcollector/StubOutputCollector.java  |  94 ++++
 integration-test/pom.xml                           |   1 -
 pom.xml                                            |  11 +-
 .../storm/sql/redis/RedisDataSourcesProvider.java  |   3 +-
 33 files changed, 1311 insertions(+), 132 deletions(-)

diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 57fb93e51..ffea76d71 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -15,6 +15,7 @@ List of third-party dependencies grouped by their license 
type.
         * Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org)
         * Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5 
- https://commons.apache.org/proper/commons-fileupload/)
         * Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 - 
https://commons.apache.org/proper/commons-lang/)
+        * Apache Commons Pool (org.apache.commons:commons-pool2:2.12.0 - 
https://commons.apache.org/proper/commons-pool/)
         * Apache Commons Text (org.apache.commons:commons-text:1.11.0 - 
https://commons.apache.org/proper/commons-text)
         * Apache Directory API ASN.1 API 
(org.apache.directory.api:api-asn1-api:2.1.4 - 
https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-api/)
         * Apache Directory API ASN.1 BER 
(org.apache.directory.api:api-asn1-ber:2.1.4 - 
https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-ber/)
@@ -26,8 +27,7 @@ List of third-party dependencies grouped by their license 
type.
         * Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j/log4j-core/)
         * Apache Log4j SLF4J Binding 
(org.apache.logging.log4j:log4j-slf4j-impl:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j/log4j-slf4j-impl/)
         * Apache Log4j Web (org.apache.logging.log4j:log4j-web:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j/log4j-web/)
-        * Gson (com.google.code.gson:gson:2.8.9 - 
https://github.com/google/gson/gson)
-        * Gson (com.google.code.gson:gson:2.9.0 - 
https://github.com/google/gson/gson)
+        * Gson (com.google.code.gson:gson:2.10.1 - 
https://github.com/google/gson/gson)
         * Maven Plugin Tools Java Annotations 
(org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 - 
https://maven.apache.org/plugin-tools/maven-plugin-annotations)
         * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - 
https://github.com/xerial/snappy-java)
 
@@ -73,7 +73,6 @@ List of third-party dependencies grouped by their license 
type.
         * Apache Commons Logging (commons-logging:commons-logging:1.2 - 
http://commons.apache.org/proper/commons-logging/)
         * Apache Commons Math (org.apache.commons:commons-math3:3.6.1 - 
http://commons.apache.org/proper/commons-math/)
         * Apache Commons Net (commons-net:commons-net:3.9.0 - 
https://commons.apache.org/proper/commons-net/)
-        * Apache Commons Pool (org.apache.commons:commons-pool2:2.4.2 - 
http://commons.apache.org/proper/commons-pool/)
         * Apache Curator (org.apache.curator:apache-curator:2.12.0 - 
http://curator.apache.org)
         * Apache Derby Database Engine and Embedded JDBC Driver 
(org.apache.derby:derby:10.14.1.0 - http://db.apache.org/derby/)
         * Apache Geronimo JCache Spec 1.0 
(org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1 - 
http://geronimo.apache.org/maven/specs/geronimo-jcache_1.0_spec/1.0-alpha-1)
@@ -215,7 +214,6 @@ List of third-party dependencies grouped by their license 
type.
         * Google Guice - Extensions - AssistedInject 
(com.google.inject.extensions:guice-assistedinject:3.0 - 
http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/)
         * Google Guice - Extensions - Servlet 
(com.google.inject.extensions:guice-servlet:4.0 - 
https://github.com/google/guice/extensions-parent/guice-servlet)
         * Graphite Integration for Metrics 
(io.dropwizard.metrics:metrics-graphite:3.2.6 - 
http://metrics.dropwizard.io/metrics-graphite/)
-        * Gson (com.google.code.gson:gson:2.2.4 - 
http://code.google.com/p/google-gson/)
         * Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1 
- http://code.google.com/p/guava-libraries/guava)
         * Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - 
https://github.com/google/guava/guava)
         * Guava: Google Core Libraries for Java 
(com.google.guava:guava:32.1.3-jre - https://github.com/google/guava)
@@ -659,7 +657,7 @@ List of third-party dependencies grouped by their license 
type.
         * argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 - 
http://argparse4j.github.io)
         * Checker Qual (org.checkerframework:checker-qual:3.37.0 - 
https://checkerframework.org/)
         * JCodings (org.jruby.jcodings:jcodings:1.0.55 - 
http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
-        * Jedis (redis.clients:jedis:2.9.0 - 
https://github.com/xetorthio/jedis)
+        * Jedis (redis.clients:jedis:5.1.0 - https://github.com/redis/jedis)
         * Joni (org.jruby.joni:joni:2.1.31 - 
http://nexus.sonatype.org/oss-repository-hosting.html/joni)
         * JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - 
http://pholser.github.io/jopt-simple)
         * JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 - 
http://www.slf4j.org)
@@ -682,6 +680,7 @@ List of third-party dependencies grouped by their license 
type.
     Public Domain
 
         * AOP alliance (aopalliance:aopalliance:1.0 - 
http://aopalliance.sourceforge.net)
+        * JSON in Java (org.json:json:20231013 - 
https://github.com/douglascrockford/JSON-java)
 
     Revised BSD
 
diff --git a/LICENSE-binary b/LICENSE-binary
index a6c05f4c8..3d10d5dac 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -656,7 +656,6 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5 
- https://commons.apache.org/proper/commons-fileupload/)
         * Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 - 
https://commons.apache.org/proper/commons-lang/)
         * Apache Commons Text (org.apache.commons:commons-text:1.11.0 - 
https://commons.apache.org/proper/commons-text)
-        * Gson (com.google.code.gson:gson:2.9.0 - 
https://github.com/google/gson/gson)
         * snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 - 
https://github.com/xerial/snappy-java)
 
     Apache License
@@ -808,6 +807,7 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Google Guice - Extensions - AssistedInject 
(com.google.inject.extensions:guice-assistedinject:3.0 - 
http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/)
         * Google Guice - Extensions - Servlet 
(com.google.inject.extensions:guice-servlet:4.0 - 
https://github.com/google/guice/extensions-parent/guice-servlet)
         * Graphite Integration for Metrics 
(io.dropwizard.metrics:metrics-graphite:3.2.6 - 
http://metrics.dropwizard.io/metrics-graphite/)
+        * Gson (com.google.code.gson:gson:2.10.1 - 
https://github.com/google/gson/gson)
         * Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1 
- http://code.google.com/p/guava-libraries/guava)
         * Guava: Google Core Libraries for Java 
(com.google.guava:guava:32.1.3-jre - https://github.com/google/guava)
         * Guava InternalFutureFailureAccess and InternalFutures 
(com.google.guava:failureaccess:1.0.1 - 
https://github.com/google/guava/failureaccess)
diff --git 
a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
 
b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
index 9f060b806..9d0d4c113 100644
--- 
a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
+++ 
b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
@@ -35,8 +35,7 @@ import 
org.apache.storm.redis.common.container.RedisCommandsContainerBuilder;
 import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
 
 public class Base64ToBinaryStateMigrationUtil {
     private static final Logger LOG = 
LoggerFactory.getLogger(Base64ToBinaryStateMigrationUtil.class);
diff --git a/external/storm-elasticsearch/pom.xml 
b/external/storm-elasticsearch/pom.xml
index ba68ee7b8..df31db80b 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -125,7 +125,7 @@
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>1.19.1</version>
+            <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index b93d3d951..1cb7b6e06 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -78,6 +78,19 @@
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest</artifactId>
         </dependency>
+        <!-- Test Containers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index fd49f9180..0a7d293d1 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -15,12 +15,11 @@ package org.apache.storm.redis.bolt;
 import java.util.Map;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
 import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import redis.clients.jedis.JedisCommands;
 
 /**
  * AbstractRedisBolt class is for users to implement custom bolts which makes 
interaction with Redis.
@@ -45,7 +44,7 @@ import redis.clients.jedis.JedisCommands;
 public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
     protected OutputCollector collector;
 
-    private transient JedisCommandsInstanceContainer container;
+    private transient JedisCommandsContainer container;
 
     private JedisPoolConfig jedisPoolConfig;
     private JedisClusterConfig jedisClusterConfig;
@@ -89,18 +88,10 @@ public abstract class AbstractRedisBolt extends 
BaseTickTupleAwareRichBolt {
      * Borrow JedisCommands instance from container.<p/>
      * JedisCommands is an interface which contains single key operations.
      * @return implementation of JedisCommands
-     * @see JedisCommandsInstanceContainer#getInstance()
+     * @see JedisCommandsContainer
      */
-    protected JedisCommands getInstance() {
-        return this.container.getInstance();
-    }
-
-    /**
-     * Return borrowed instance to container.
-     * @param instance borrowed object
-     */
-    protected void returnInstance(JedisCommands instance) {
-        this.container.returnInstance(instance);
+    protected JedisCommandsContainer getInstance() {
+        return this.container;
     }
 
     @Override
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
index a3f0ace8a..d9228d32d 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
@@ -13,14 +13,16 @@
 package org.apache.storm.redis.bolt;
 
 import java.util.List;
+import java.util.Objects;
+
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisFilterMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import redis.clients.jedis.GeoCoordinate;
-import redis.clients.jedis.JedisCommands;
 
 /**
  * Basic bolt for querying from Redis and filters out if key/field doesn't 
exist.
@@ -85,7 +87,7 @@ public class RedisFilterBolt extends AbstractRedisBolt {
         String key = filterMapper.getKeyFromTuple(input);
 
         boolean found;
-        JedisCommands jedisCommand = null;
+        JedisCommandsContainer jedisCommand = null;
         try {
             jedisCommand = getInstance();
 
@@ -112,7 +114,13 @@ public class RedisFilterBolt extends AbstractRedisBolt {
 
                 case GEO:
                     List<GeoCoordinate> geopos = 
jedisCommand.geopos(additionalKey, key);
-                    found = (geopos != null && geopos.size() > 0);
+                    if (geopos == null || geopos.isEmpty()) {
+                        found = false;
+                    } else {
+                        // If any entry is NOT null, then we have a match.
+                        found = geopos.stream()
+                            .anyMatch(Objects::nonNull);
+                    }
                     break;
 
                 default:
@@ -127,8 +135,6 @@ public class RedisFilterBolt extends AbstractRedisBolt {
         } catch (Exception e) {
             this.collector.reportError(e);
             this.collector.fail(input);
-        } finally {
-            returnInstance(jedisCommand);
         }
     }
 
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
index 4d903402a..07c79773c 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -15,12 +15,12 @@ package org.apache.storm.redis.bolt;
 import java.util.List;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import redis.clients.jedis.JedisCommands;
 
 /**
  * Basic bolt for querying from Redis and emits response as tuple.
@@ -70,7 +70,7 @@ public class RedisLookupBolt extends AbstractRedisBolt {
         String key = lookupMapper.getKeyFromTuple(input);
         Object lookupValue;
 
-        JedisCommands jedisCommand = null;
+        JedisCommandsContainer jedisCommand = null;
         try {
             jedisCommand = getInstance();
 
@@ -116,8 +116,6 @@ public class RedisLookupBolt extends AbstractRedisBolt {
         } catch (Exception e) {
             this.collector.reportError(e);
             this.collector.fail(input);
-        } finally {
-            returnInstance(jedisCommand);
         }
     }
 
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
index ce08a735d..065d9cbea 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -14,11 +14,11 @@ package org.apache.storm.redis.bolt;
 
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
-import redis.clients.jedis.JedisCommands;
 
 /**
  * Basic bolt for writing to Redis.
@@ -66,7 +66,7 @@ public class RedisStoreBolt extends AbstractRedisBolt {
         String key = storeMapper.getKeyFromTuple(input);
         String value = storeMapper.getValueFromTuple(input);
 
-        JedisCommands jedisCommand = null;
+        JedisCommandsContainer jedisCommand = null;
         try {
             jedisCommand = getInstance();
 
@@ -114,8 +114,6 @@ public class RedisStoreBolt extends AbstractRedisBolt {
         } catch (Exception e) {
             this.collector.reportError(e);
             this.collector.fail(input);
-        } finally {
-            returnInstance(jedisCommand);
         }
     }
 
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
index d791bd865..a3f1f5a6b 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.storm.redis.common.commands.RedisCommands;
 import redis.clients.jedis.Jedis;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 /**
  * Adapter class to make Jedis instance play with BinaryRedisCommands 
interface.
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
index 15f5e0a88..9d2d96b19 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
@@ -23,14 +23,15 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.storm.redis.common.commands.RedisCommands;
 import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
+
 
 /**
  * Adapter class to make JedisCluster instance play with BinaryRedisCommands 
interface.
  */
 public class RedisCommandsAdapterJedisCluster implements RedisCommands, 
Closeable {
-    private JedisCluster jedisCluster;
+    private final JedisCluster jedisCluster;
 
     public RedisCommandsAdapterJedisCluster(JedisCluster jedisCluster) {
         this.jedisCluster = jedisCluster;
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
index 58835b1ab..5a7170eb2 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
@@ -19,8 +19,9 @@
 package org.apache.storm.redis.common.commands;
 
 import java.util.Map;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 /**
  * This interface represents Jedis methods exhaustively which are used on 
storm-redis.
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
index ca2a2f44e..5220c1c7a 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -12,16 +12,16 @@
 
 package org.apache.storm.redis.common.container;
 
-import java.io.IOException;
+import java.util.List;
+import redis.clients.jedis.GeoCoordinate;
 import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
 
 /**
  * Container for managing JedisCluster.
  * <p/>
  * Note that JedisCluster doesn't need to be pooled since it's thread-safe and 
it stores pools internally.
  */
-public class JedisClusterContainer implements JedisCommandsInstanceContainer {
+public class JedisClusterContainer implements JedisCommandsContainer {
 
     private JedisCluster jedisCluster;
 
@@ -33,20 +33,94 @@ public class JedisClusterContainer implements 
JedisCommandsInstanceContainer {
         this.jedisCluster = jedisCluster;
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
-    public JedisCommands getInstance() {
-        return this.jedisCluster;
+    public Boolean exists(final String key) {
+        return jedisCluster.exists(key);
+    }
+
+    @Override
+    public String get(final String key) {
+        return jedisCluster.get(key);
+    }
+
+    @Override
+    public String hget(final String key, final String field) {
+        return jedisCluster.hget(key, field);
+    }
+
+    @Override
+    public Long geoadd(final String key, final double longitude, final double 
latitude, final String member) {
+        return jedisCluster.geoadd(key, longitude, latitude, member);
+    }
+
+    @Override
+    public List<GeoCoordinate> geopos(final String key, final String... 
members) {
+        return jedisCluster.geopos(key, members);
+    }
+
+    @Override
+    public Boolean hexists(final String key, final String field) {
+        return jedisCluster.hexists(key, field);
+    }
+
+    @Override
+    public Long hset(final String key, final String field, final String value) 
{
+        return jedisCluster.hset(key, field, value);
+    }
+
+    @Override
+    public String lpop(final String key) {
+        return jedisCluster.lpop(key);
+    }
+
+    @Override
+    public Long pfadd(final String key, final String... elements) {
+        return jedisCluster.pfadd(key, elements);
+    }
+
+    @Override
+    public long pfcount(final String key) {
+        return jedisCluster.pfcount(key);
+    }
+
+    @Override
+    public Long rpush(final String key, final String... string) {
+        return jedisCluster.rpush(key, string);
+    }
+
+    @Override
+    public Long sadd(final String key, final String... member) {
+        return jedisCluster.sadd(key, member);
+    }
+
+    @Override
+    public Long scard(final String key) {
+        return jedisCluster.scard(key);
+    }
+
+    @Override
+    public String set(final String key, final String value) {
+        return jedisCluster.set(key, value);
+    }
+
+    @Override
+    public Boolean sismember(final String key, final String member) {
+        return jedisCluster.sismember(key, member);
+    }
+
+    @Override
+    public Long zadd(final String key, final double score, final String 
member) {
+        return jedisCluster.zadd(key, score, member);
+    }
+
+    @Override
+    public Long zrank(final String key, final String member) {
+        return jedisCluster.zrank(key, member);
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        // do nothing
+    public Double zscore(final String key, final String member) {
+        return jedisCluster.zscore(key, member);
     }
 
     /**
@@ -54,10 +128,6 @@ public class JedisClusterContainer implements 
JedisCommandsInstanceContainer {
      */
     @Override
     public void close() {
-        try {
-            this.jedisCluster.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        this.jedisCluster.close();
     }
 }
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
similarity index 51%
rename from 
external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
rename to 
external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
index 49a3373fe..38b1321b7 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
@@ -13,23 +13,48 @@
 package org.apache.storm.redis.common.container;
 
 import java.io.Closeable;
-import redis.clients.jedis.JedisCommands;
+import java.util.List;
+import redis.clients.jedis.GeoCoordinate;
 
 /**
  * Interfaces for containers which stores instances implementing JedisCommands.
  */
-public interface JedisCommandsInstanceContainer extends Closeable {
-    /**
-     * Borrows instance from container.
-     * @return instance which implements JedisCommands
-     */
-    JedisCommands getInstance();
+public interface JedisCommandsContainer extends Closeable {
+    Boolean exists(String key);
 
-    /**
-     * Returns instance to container.
-     * @param jedisCommands borrowed instance
-     */
-    void returnInstance(JedisCommands jedisCommands);
+    String get(String key);
+
+    String hget(String key, String field);
+
+    Long geoadd(String key, double longitude, double latitude, String member);
+
+    List<GeoCoordinate> geopos(String key, String... members);
+
+    Boolean hexists(String key, String field);
+
+    Long hset(String key, String field, String value);
+
+    String lpop(String key);
+
+    Long pfadd(String key, String... elements);
+
+    long pfcount(String key);
+
+    Long rpush(String key, String... string);
+
+    Long sadd(String key, String... member);
+
+    Long scard(String key);
+
+    String set(String key, String value);
+
+    Boolean sismember(String key, String member);
+
+    Long zadd(String key, double score, String member);
+
+    Long zrank(String key, String member);
+
+    Double zscore(String key, String member);
 
     /**
      * Release Container.
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
index 86f0f0912..9370e3421 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -12,8 +12,10 @@
 
 package org.apache.storm.redis.common.container;
 
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.Connection;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPool;
 
@@ -22,18 +24,17 @@ import redis.clients.jedis.JedisPool;
  */
 public class JedisCommandsContainerBuilder {
 
-    // FIXME: We're using default config since it cannot be serialized
-    // We still needs to provide some options externally
-    public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
     /**
      * Builds container for single Redis environment.
      * @param config configuration for JedisPool
      * @return container for single Redis environment
      */
-    public static JedisCommandsInstanceContainer build(JedisPoolConfig config) 
{
+    public static JedisCommandsContainer build(JedisPoolConfig config) {
+        // FIXME: We're using default config since it cannot be serialized
+        // We still needs to provide some options externally
         JedisPool jedisPool =
-            new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), 
config.getPort(), config.getTimeout(), config.getPassword(),
+            new JedisPool(new redis.clients.jedis.JedisPoolConfig(), 
config.getHost(), config.getPort(),
+                    config.getTimeout(), config.getPassword(),
                           config.getDatabase());
         return new JedisContainer(jedisPool);
     }
@@ -43,10 +44,12 @@ public class JedisCommandsContainerBuilder {
      * @param config configuration for JedisCluster
      * @return container for Redis Cluster environment
      */
-    public static JedisCommandsInstanceContainer build(JedisClusterConfig 
config) {
+    public static JedisCommandsContainer build(JedisClusterConfig config) {
+        // FIXME: We're using default config since it cannot be serialized
+        // We still needs to provide some options externally
         JedisCluster jedisCluster =
             new JedisCluster(config.getNodes(), config.getTimeout(), 
config.getTimeout(), config.getMaxRedirections(), config.getPassword(),
-                             DEFAULT_POOL_CONFIG);
+                    new GenericObjectPoolConfig<>());
         return new JedisClusterContainer(jedisCluster);
     }
 }
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
index 1ebb5508a..1eed610dc 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -14,15 +14,19 @@ package org.apache.storm.redis.common.container;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.GeoCoordinate;
 import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.commands.JedisCommands;
 
 /**
- * Container for managing Jedis instances.
+ * Adapter for providing a unified interface for running commands over both 
Jedis and JedisCluster instances.
  */
-public class JedisContainer implements JedisCommandsInstanceContainer {
+public class JedisContainer implements JedisCommandsContainer {
     private static final Logger LOG = 
LoggerFactory.getLogger(JedisContainer.class);
 
     private JedisPool jedisPool;
@@ -35,35 +39,114 @@ public class JedisContainer implements 
JedisCommandsInstanceContainer {
         this.jedisPool = jedisPool;
     }
 
+    private <T> T runCommand(Function<JedisCommands, T> command) {
+        final JedisCommands jedisCommands = jedisPool.getResource();
+        try {
+            return command.apply(jedisCommands);
+        } finally {
+            try {
+                ((Closeable) jedisCommands).close();
+            } catch (IOException e) {
+                LOG.error("Failed to close (return) instance to pool");
+            }
+        }
+    }
+
     /**
      * {@inheritDoc}
      */
     @Override
-    public JedisCommands getInstance() {
-        return jedisPool.getResource();
+    public void close() {
+        jedisPool.close();
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        if (jedisCommands == null) {
-            return;
-        }
+    public Boolean exists(final String key) {
+        return runCommand((jedisCommands) -> jedisCommands.exists(key));
+    }
 
-        try {
-            ((Closeable) jedisCommands).close();
-        } catch (IOException e) {
-            LOG.error("Failed to close (return) instance to pool");
-        }
+    @Override
+    public String get(final String key) {
+        return runCommand((jedisCommands) -> jedisCommands.get(key));
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
-    public void close() {
-        jedisPool.close();
+    public String hget(final String key, final String field) {
+        return runCommand((jedisCommands) -> jedisCommands.hget(key, field));
+    }
+
+    @Override
+    public Long geoadd(final String key, final double longitude, final double 
latitude, final String member) {
+        return runCommand((jedisCommands) -> jedisCommands.geoadd(key, 
longitude, latitude, member));
+    }
+
+    @Override
+    public List<GeoCoordinate> geopos(final String key, final String... 
members) {
+        return runCommand((jedisCommands) -> jedisCommands.geopos(key, 
members));
+    }
+
+    @Override
+    public Boolean hexists(final String key, final String field) {
+        return runCommand((jedisCommands) -> jedisCommands.hexists(key, 
field));
+    }
+
+    @Override
+    public Long hset(final String key, final String field, final String value) 
{
+        return runCommand((jedisCommands) -> jedisCommands.hset(key, field, 
value));
+    }
+
+    @Override
+    public String lpop(final String key) {
+        return runCommand((jedisCommands) -> jedisCommands.lpop(key));
+    }
+
+    @Override
+    public Long pfadd(final String key, final String... elements) {
+        return runCommand((jedisCommands) -> jedisCommands.pfadd(key, 
elements));
+    }
+
+    @Override
+    public long pfcount(final String key) {
+        return runCommand((jedisCommands) -> jedisCommands.pfcount(key));
+    }
+
+    @Override
+    public Long rpush(final String key, final String... string) {
+        return runCommand((jedisCommands) -> jedisCommands.rpush(key, string));
+    }
+
+    @Override
+    public Long sadd(final String key, final String... member) {
+        return runCommand((jedisCommands) -> jedisCommands.sadd(key, member));
+    }
+
+    @Override
+    public Long scard(final String key) {
+        return runCommand((jedisCommands) -> jedisCommands.scard(key));
+    }
+
+    @Override
+    public String set(final String key, final String value) {
+        return runCommand((jedisCommands) -> jedisCommands.set(key, value));
+    }
+
+    @Override
+    public Boolean sismember(final String key, final String member) {
+        return runCommand((jedisCommands) -> jedisCommands.sismember(key, 
member));
+    }
+
+    @Override
+    public Long zadd(final String key, final double score, final String 
member) {
+        return runCommand((jedisCommands) -> jedisCommands.zadd(key, score, 
member));
+    }
+
+    @Override
+    public Long zrank(final String key, final String member) {
+        return runCommand((jedisCommands) -> jedisCommands.zrank(key, member));
+    }
+
+    @Override
+    public Double zscore(final String key, final String member) {
+        return runCommand((jedisCommands) -> jedisCommands.zscore(key, 
member));
     }
 }
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
index 04c05b876..600d4fec6 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.redis.common.container;
 
-import java.io.IOException;
 import org.apache.storm.redis.common.adapter.RedisCommandsAdapterJedisCluster;
 import org.apache.storm.redis.common.commands.RedisCommands;
 import redis.clients.jedis.JedisCluster;
@@ -60,11 +59,7 @@ public class RedisClusterContainer implements 
RedisCommandsInstanceContainer {
      * {@inheritDoc}
      */
     @Override
-    public void close() throws IOException {
-        try {
-            this.jedisCluster.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+    public void close() {
+        this.jedisCluster.close();
     }
 }
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
index 81201efd9..b369e1ca0 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.redis.common.container;
 
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import redis.clients.jedis.JedisCluster;
@@ -28,10 +29,6 @@ import redis.clients.jedis.JedisPool;
  */
 public class RedisCommandsContainerBuilder {
 
-    // FIXME: We're using default config since it cannot be serialized
-    // We still needs to provide some options externally
-    public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
     /**
      * Builds container for single Redis environment.
      *
@@ -39,8 +36,11 @@ public class RedisCommandsContainerBuilder {
      * @return container for single Redis environment
      */
     public static RedisCommandsInstanceContainer build(JedisPoolConfig config) 
{
+        // FIXME: We're using default config since it cannot be serialized
+        // We still needs to provide some options externally
         JedisPool jedisPool =
-            new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), 
config.getPort(), config.getTimeout(), config.getPassword(),
+            new JedisPool(new redis.clients.jedis.JedisPoolConfig(), 
config.getHost(), config.getPort(),
+                    config.getTimeout(), config.getPassword(),
                           config.getDatabase());
         return new RedisContainer(jedisPool);
     }
@@ -52,8 +52,10 @@ public class RedisCommandsContainerBuilder {
      * @return container for Redis Cluster environment
      */
     public static RedisCommandsInstanceContainer build(JedisClusterConfig 
config) {
+        // FIXME: We're using default config since it cannot be serialized
+        // We still needs to provide some options externally
         JedisCluster jedisCluster =
-            new JedisCluster(config.getNodes(), config.getTimeout(), 
config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+            new JedisCluster(config.getNodes(), config.getTimeout(), 
config.getMaxRedirections(), new GenericObjectPoolConfig<>());
         return new RedisClusterContainer(jedisCluster);
     }
 }
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 2a11d962a..880aa5373 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -35,7 +35,7 @@ import org.apache.storm.state.KeyValueState;
 import org.apache.storm.state.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
 
 /**
  * A redis based implementation that persists the state in Redis.
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
index 19d2b9194..e6b282714 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
@@ -22,8 +22,8 @@ import org.apache.storm.state.BaseBinaryStateIterator;
 import org.apache.storm.state.DefaultStateEncoder;
 import org.apache.storm.state.Serializer;
 import org.apache.storm.state.StateEncoder;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 /**
  * An iterator over {@link RedisKeyValueState}.
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index dddee1518..ae634d686 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -15,6 +15,8 @@ package org.apache.storm.redis.trident.state;
 import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.task.IMetricsContext;
@@ -31,6 +33,7 @@ import org.apache.storm.trident.state.map.OpaqueMap;
 import org.apache.storm.trident.state.map.SnapshottableMap;
 import org.apache.storm.trident.state.map.TransactionalMap;
 import org.apache.storm.tuple.Values;
+import redis.clients.jedis.Connection;
 import redis.clients.jedis.JedisCluster;
 
 /**
@@ -277,7 +280,7 @@ public class RedisClusterMapState<T> extends 
AbstractRedisMapState<T> {
      * RedisClusterMapState.Factory provides Redis Cluster environment version 
of StateFactory.
      */
     protected static class Factory implements StateFactory {
-        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+        public static final GenericObjectPoolConfig<Connection> 
DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>();
 
         JedisClusterConfig jedisClusterConfig;
 
@@ -316,7 +319,7 @@ public class RedisClusterMapState<T> extends 
AbstractRedisMapState<T> {
          */
         @Override
         public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
+            final JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
                                                          
jedisClusterConfig.getTimeout(),
                                                          
jedisClusterConfig.getTimeout(),
                                                          
jedisClusterConfig.getMaxRedirections(),
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index c7f018995..e052dd1b6 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -13,10 +13,13 @@
 package org.apache.storm.redis.trident.state;
 
 import java.util.Map;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
+import redis.clients.jedis.Connection;
 import redis.clients.jedis.JedisCluster;
 
 /**
@@ -74,9 +77,9 @@ public class RedisClusterState implements State {
      * @see StateFactory
      */
     public static class Factory implements StateFactory {
-        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+        public static final GenericObjectPoolConfig<Connection> 
DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>();
 
-        private JedisClusterConfig jedisClusterConfig;
+        private final JedisClusterConfig jedisClusterConfig;
 
         /**
          * Constructor.
@@ -92,7 +95,7 @@ public class RedisClusterState implements State {
          */
         @Override
         public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
+            final JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
                                                          
jedisClusterConfig.getTimeout(),
                                                          
jedisClusterConfig.getTimeout(),
                                                          
jedisClusterConfig.getMaxRedirections(),
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
new file mode 100644
index 000000000..dc80361a6
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.redis.bolt;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.redis.util.JedisTestHelper;
+import org.apache.storm.redis.util.StubTuple;
+import org.apache.storm.redis.util.TupleTestHelper;
+import org.apache.storm.redis.util.outputcollector.EmittedTuple;
+import org.apache.storm.redis.util.outputcollector.StubOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.GEO;
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HASH;
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HYPER_LOG_LOG;
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SET;
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SORTED_SET;
+import static 
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+@Testcontainers
+class RedisFilterBoltTest {
+
+    @Container
+    public GenericContainer container = new 
GenericContainer("redis:7.2.3-alpine")
+        .withExposedPorts(6379);
+
+    private JedisTestHelper jedisHelper;
+
+    private JedisPoolConfig.Builder configBuilder;
+    private StubOutputCollector outputCollector;
+    private TopologyContext topologyContext;
+
+    @BeforeEach
+    void setup() {
+        configBuilder = new JedisPoolConfig.Builder();
+        configBuilder
+            .setHost(container.getHost())
+            .setPort(container.getFirstMappedPort())
+            .setTimeout(10);
+
+        outputCollector = new StubOutputCollector();
+        topologyContext = mock(TopologyContext.class);
+        jedisHelper = new JedisTestHelper(container);
+    }
+
+    @AfterEach
+    void cleanup() {
+        verifyNoMoreInteractions(topologyContext);
+        jedisHelper.close();
+    }
+
+    /**
+     * Smoke test the exists check when the key is NOT found.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_exists_keyNotFound() {
+        // Define input key
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does not exist in redis
+        jedisHelper.delete(inputKey);
+        assertFalse(jedisHelper.exists(inputKey), "Sanity check key should not 
exist");
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(STRING);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the exists check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_exists_keyFound() {
+        // Define input key
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.set(inputKey, "some-value");
+        assertTrue(jedisHelper.exists(inputKey), "Sanity check key exists.");
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(STRING);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Smoke test the sismember check when the key is NOT found in the set.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_sismember_notMember() {
+        // Define input key
+        final String setKey = "ThisIsMySet";
+        final String inputKey = "ThisIsMyKey";
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(SET, setKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the exists check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_sismember_isMember() {
+        // Define input key
+        final String setKey = "ThisIsMySet";
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.smember(setKey, inputKey);
+        assertTrue(jedisHelper.sismember(setKey, inputKey), "Sanity check, 
should be a member");
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(SET, setKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Smoke test the hexists check when the key is NOT found in the set.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_hexists_notMember() {
+        // Define input key
+        final String hashKey = "ThisIsMyHash";
+        final String inputKey = "ThisIsMyKey";
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(HASH, hashKey);
+
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the hexists check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_hexists_isMember() {
+        // Define input key
+        final String hashKey = "ThisIsMyHash";
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.hset(hashKey, inputKey, "value");
+        assertTrue(jedisHelper.hexists(hashKey, inputKey), "Sanity check, 
should be a member");
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(HASH, hashKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Smoke test the zrank check when the key is NOT found in the set.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_zrank_notMember() {
+        // Define input key
+        final String setKey = "ThisIsMySetKey";
+        final String inputKey = "ThisIsMyKey";
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(SORTED_SET, setKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the zrank check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_zrank_isMember() {
+        // Define input key
+        final String setKey = "ThisIsMySetKey";
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.zrank(setKey, 2, inputKey);
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(SORTED_SET, setKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Smoke test the pfcount check when the key is NOT found in the set.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_pfcount_notMember() {
+        // Define input key
+        final String inputKey = "ThisIsMyKey";
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(HYPER_LOG_LOG);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the pfcount check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_pfcount_isMember() {
+        // Define input key
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.pfadd(inputKey, "my value");
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(HYPER_LOG_LOG);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Smoke test the geopos check when the key is NOT found in the set.
+     * Expectation is tuple is acked, and nothing is emitted.
+     */
+    @Test
+    void smokeTest_geopos_notMember() {
+        // Define input key
+        final String geoKey = "ThisIsMyGeoKey";
+        final String inputKey = "ThisIsMyKey";
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(GEO, geoKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify the bolt filtered the input tuple.
+        verifyTupleFiltered();
+    }
+
+    /**
+     * Smoke test the geopos check when the key IS found.
+     * Expectation is tuple is acked, and tuple is emitted.
+     */
+    @Test
+    void smokeTest_geopos_isMember() {
+        // Define input key
+        final String geoKey = "ThisIsMyGeoKey";
+        final String inputKey = "ThisIsMyKey";
+
+        // Ensure key does exist in redis
+        jedisHelper.geoadd(geoKey, 139.731992, 35.709026, inputKey);
+
+        // Create an input tuple
+        final Map<String, Object> values = new HashMap<>();
+        values.put("key", inputKey);
+        values.put("value", "ThisIsMyValue");
+        final Tuple tuple = new StubTuple(values);
+
+        final JedisPoolConfig config = configBuilder.build();
+        final TestMapper mapper = new TestMapper(GEO, geoKey);
+
+        final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+        bolt.prepare(new HashMap<>(), topologyContext, new 
OutputCollector(outputCollector));
+        bolt.process(tuple);
+
+        // Verify Tuple passed through the bolt
+        verifyTuplePassed(tuple);
+    }
+
+    /**
+     * Utility method to help verify that a tuple passed throught hte 
RedisFilterBolt properly.
+     * @param expectedTuple The tuple we expected to pass through the bolt.
+     */
+    private void verifyTuplePassed(final Tuple expectedTuple) {
+        // Verify no errors or failed tuples
+        assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have 
no reported errors");
+        assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have 
no failed tuples");
+
+        // We should have a single acked tuple
+        assertEquals(1, outputCollector.getAckedTuples().size(), "Should have 
a single acked tuple");
+
+        // We should have a single emitted tuple.
+        assertEquals(1, outputCollector.getEmittedTuples().size(), "Should 
have a single emitted tuple");
+
+        // Verify the tuple is what we expected
+        final EmittedTuple emittedTuple = 
outputCollector.getEmittedTuples().get(0);
+        assertEquals("default", emittedTuple.getStreamId());
+        TupleTestHelper.verifyAnchors(emittedTuple, expectedTuple);
+        TupleTestHelper.verifyEmittedTuple(emittedTuple, 
expectedTuple.getValues());
+    }
+
+    /**
+     * Utility method to help verify that no tuples were passed through the 
RedisFilterBolt.
+     */
+    private void verifyTupleFiltered() {
+        // Verify no errors or failed tuples
+        assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have 
no reported errors");
+        assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have 
no failed tuples");
+
+        // We should have a single acked tuple
+        assertEquals(1, outputCollector.getAckedTuples().size(), "Should have 
a single acked tuple");
+
+        // We should have no emitted tuple.
+        assertTrue(outputCollector.getEmittedTuples().isEmpty(), "Should have 
no emitted tuples");
+    }
+
+    /**
+     * Test Implementation.
+     */
+    private static class TestMapper implements RedisFilterMapper {
+        private final RedisDataTypeDescription.RedisDataType dataType;
+        private final String additionalKey;
+
+        private TestMapper(final RedisDataTypeDescription.RedisDataType 
dataType) {
+            this(dataType, null);
+        }
+
+        private TestMapper(final RedisDataTypeDescription.RedisDataType 
dataType, final String additionalKey) {
+            this.dataType = dataType;
+            this.additionalKey = additionalKey;
+        }
+
+        @Override
+        public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+            declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("key", 
"value"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return new RedisDataTypeDescription(dataType, additionalKey);
+        }
+
+        @Override
+        public String getKeyFromTuple(final ITuple tuple) {
+            return tuple.getStringByField("key");
+        }
+
+        @Override
+        public String getValueFromTuple(final ITuple tuple) {
+            return tuple.getStringByField("value");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
index 212504e65..00f4a0eb4 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
@@ -24,8 +24,8 @@ import org.apache.storm.state.DefaultStateSerializer;
 import org.apache.storm.state.Serializer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index e6ee68887..f82ef6fa4 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
new file mode 100644
index 000000000..9c357493f
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.Objects;
+import org.testcontainers.containers.GenericContainer;
+import redis.clients.jedis.Jedis;
+
+/**
+ * Utility class for helping interact with a Redis service in tests.
+ */
+public class JedisTestHelper {
+    private final Jedis jedis;
+
+    /**
+     * Constructor.
+     * @param container Container instance to create a redis client against.
+     */
+    public JedisTestHelper(final GenericContainer container) {
+        Objects.requireNonNull(container);
+
+        jedis = new Jedis(
+            container.getHost(),
+            container.getFirstMappedPort()
+        );
+    }
+
+    public void delete(final String key) {
+        jedis.del(key);
+    }
+
+    public void geoadd(final String key, final double longitude, final double 
latitude, final String value) {
+        jedis.geoadd(key, longitude, latitude, value);
+    }
+
+    public boolean hexists(final String hash, final String key) {
+        return jedis.hexists(hash, key);
+    }
+
+    public void hset(final String hash, final String key, final String value) {
+        jedis.hset(hash, key, value);
+    }
+
+    public boolean exists(final String key) {
+        return jedis.exists(key);
+    }
+
+    public void pfadd(final String key, final String value) {
+        jedis.pfadd(key, value);
+    }
+
+    public void set(final String key, final String value) {
+        jedis.set(key, value);
+    }
+
+    public void smember(final String set, final String value) {
+        jedis.sadd(set, value);
+    }
+
+    public boolean sismember(final String set, final String value) {
+        return jedis.sismember(set, value);
+    }
+
+    public void zrank(final String set, final double score, final String 
value) {
+        jedis.zadd(set, score, value);
+    }
+
+    public void close() {
+        jedis.close();
+    }
+}
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java
new file mode 100644
index 000000000..ec1e3e942
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.org.apache.commons.lang.NotImplementedException;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+
+
+/**
+ * Partial Implementation of the Tuple interface for tests.
+ */
+public class StubTuple implements Tuple {
+
+    final Map<String, Object> values;
+
+    public StubTuple(final Map<String, Object> values) {
+        this.values = Collections.unmodifiableMap(new 
HashMap<>(Objects.requireNonNull(values)));
+    }
+
+    @Override
+    public int size() {
+        return values.size();
+    }
+
+    @Override
+    public boolean contains(final String field) {
+        return values.containsKey(field);
+    }
+
+    @Override
+    public Fields getFields() {
+        return new Fields(values.keySet().toArray(new String[0]));
+    }
+
+    @Override
+    public int fieldIndex(final String field) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public List<Object> select(final Fields selector) {
+        return null;
+    }
+
+    @Override
+    public Object getValue(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public String getString(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Integer getInteger(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Long getLong(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Boolean getBoolean(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Short getShort(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Byte getByte(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Double getDouble(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Float getFloat(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public byte[] getBinary(final int i) {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public Object getValueByField(final String field) {
+        return values.get(field);
+    }
+
+    @Override
+    public String getStringByField(final String field) {
+        return values.get(field).toString();
+    }
+
+    @Override
+    public Integer getIntegerByField(final String field) {
+        return (Integer) values.get(field);
+    }
+
+    @Override
+    public Long getLongByField(final String field) {
+        return (Long) values.get(field);
+    }
+
+    @Override
+    public Boolean getBooleanByField(final String field) {
+        return (Boolean) values.get(field);
+    }
+
+    @Override
+    public Short getShortByField(final String field) {
+        return (Short) values.get(field);
+    }
+
+    @Override
+    public Byte getByteByField(final String field) {
+        return (Byte) values.get(field);
+    }
+
+    @Override
+    public Double getDoubleByField(final String field) {
+        return (Double) values.get(field);
+    }
+
+    @Override
+    public Float getFloatByField(final String field) {
+        return (Float) values.get(field);
+    }
+
+    @Override
+    public byte[] getBinaryByField(final String field) {
+        return (byte[]) values.get(field);
+    }
+
+    @Override
+    public List<Object> getValues() {
+        return new ArrayList<>(values.values());
+    }
+
+    @Override
+    public GlobalStreamId getSourceGlobalStreamId() {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public String getSourceComponent() {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public int getSourceTask() {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public String getSourceStreamId() {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        throw new NotImplementedException("Not implemented");
+    }
+
+    @Override
+    public GeneralTopologyContext getContext() {
+        throw new NotImplementedException("Not implemented");
+    }
+}
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
new file mode 100644
index 000000000..0580ec18c
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.storm.redis.util.outputcollector.EmittedTuple;
+import org.apache.storm.tuple.Tuple;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Utility for common test validations.
+ */
+public class TupleTestHelper {
+
+    public static void verifyAnchors(final EmittedTuple emittedTuple, final 
Tuple expectedAnchor) {
+        Objects.requireNonNull(emittedTuple);
+        Objects.requireNonNull(expectedAnchor);
+        assertEquals(1, emittedTuple.getAnchors().size(), "Should have a 
single anchor");
+
+        final Tuple anchor = emittedTuple.getAnchors().get(0);
+        assertNotNull(anchor, "Should be non-null");
+        assertEquals(expectedAnchor, anchor);
+    }
+
+    public static void verifyEmittedTuple(final EmittedTuple emittedTuple, 
final List<Object> expectedValues) {
+        Objects.requireNonNull(emittedTuple);
+        Objects.requireNonNull(expectedValues);
+
+        assertEquals(expectedValues.size(), emittedTuple.getTuple().size());
+        assertEquals(expectedValues, emittedTuple.getTuple());
+    }
+}
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
new file mode 100644
index 000000000..0196db92b
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util.outputcollector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used with StubOutputCollector for testing.
+ */
+public class EmittedTuple {
+    private final String streamId;
+    private final List<Object> tuple;
+    private final List<Tuple> anchors;
+
+    public EmittedTuple(final String streamId, final List<Object> tuple, final 
Collection<Tuple> anchors) {
+        this.streamId = streamId;
+        this.tuple = tuple;
+        this.anchors = new ArrayList<>(anchors);
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public List<Object> getTuple() {
+        return tuple;
+    }
+
+    public List<Tuple> getAnchors() {
+        return Collections.unmodifiableList(anchors);
+    }
+
+    @Override
+    public String toString() {
+        return "EmittedTuple{"
+            + "streamId='" + streamId + '\''
+            + ", tuple=" + tuple
+            + ", anchors=" + anchors
+            + '}';
+    }
+}
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
new file mode 100644
index 000000000..3fec79de1
--- /dev/null
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util.outputcollector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Stub implementation for testing.
+ */
+public class StubOutputCollector implements IOutputCollector {
+
+    final List<EmittedTuple> emittedTuples = new ArrayList<>();
+    final List<Tuple> ackedTuples = new ArrayList<>();
+    final List<Tuple> failedTuples = new ArrayList<>();
+    final List<Throwable> reportedErrors = new ArrayList<>();
+
+    @Override
+    public List<Integer> emit(final String streamId, final Collection<Tuple> 
anchors, final List<Object> tuple) {
+        emittedTuples.add(
+            new EmittedTuple(streamId, tuple, anchors)
+        );
+
+        // Dummy value.
+        return Collections.singletonList(1);
+    }
+
+    @Override
+    public void emitDirect(final int taskId, final String streamId, final 
Collection<Tuple> anchors, final List<Object> tuple) {
+        throw new RuntimeException("Not implemented yet!");
+    }
+
+    @Override
+    public void ack(final Tuple input) {
+        ackedTuples.add(input);
+    }
+
+    @Override
+    public void fail(final Tuple input) {
+        failedTuples.add(input);
+    }
+
+    @Override
+    public void resetTimeout(final Tuple input) {
+        throw new RuntimeException("Not implemented yet!");
+    }
+
+    @Override
+    public void flush() {
+        throw new RuntimeException("Not implemented yet!");
+    }
+
+    @Override
+    public void reportError(final Throwable error) {
+        reportedErrors.add(error);
+    }
+
+    public List<EmittedTuple> getEmittedTuples() {
+        return Collections.unmodifiableList(emittedTuples);
+    }
+
+    public List<Throwable> getReportedErrors() {
+        return Collections.unmodifiableList(reportedErrors);
+    }
+
+    public List<Tuple> getFailedTuples() {
+        return Collections.unmodifiableList(failedTuples);
+    }
+
+    public List<Tuple> getAckedTuples() {
+        return Collections.unmodifiableList(ackedTuples);
+    }
+
+    public void reset() {
+        emittedTuples.clear();
+        ackedTuples.clear();
+        reportedErrors.clear();
+        failedTuples.clear();
+    }
+}
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index f5f708b36..7bc9528cc 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -84,7 +84,6 @@
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>2.8.9</version>
         </dependency>
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
diff --git a/pom.xml b/pom.xml
index c1716ee15..5fba628ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,13 +128,13 @@
         <hamcrest.version>2.2</hamcrest.version>
         <elasticsearch.version>7.17.13</elasticsearch.version>
         <calcite.version>1.16.0</calcite.version>
-        <jedis.version>2.9.0</jedis.version>
+        <jedis.version>5.1.0</jedis.version>
         <activemq.version>5.18.3</activemq.version>
 
         <jackson.version>2.15.2</jackson.version>
         <jackson.databind.version>2.15.2</jackson.databind.version>
-
         <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
+        <testcontainers.version>1.19.1</testcontainers.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here 
to avoid having to create a default profile -->
         
<java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups>
@@ -157,6 +157,7 @@
         <rocksdb-jni-version>8.5.4</rocksdb-jni-version>
         <json-smart.version>2.5.0</json-smart.version>
         <byte-buddy.version>1.14.9</byte-buddy.version>
+        <gson.version>2.10.1</gson.version>
 
         <!-- see intellij profile below... This fixes an annoyance with 
intellij -->
         <provided.scope>provided</provided.scope>
@@ -1150,6 +1151,12 @@
                 <artifactId>byte-buddy-agent</artifactId>
                 <version>${byte-buddy.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.google.code.gson</groupId>
+                <artifactId>gson</artifactId>
+                <version>${gson.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
diff --git 
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
 
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
index 421f84fca..b14b7f31c 100644
--- 
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++ 
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -42,8 +42,7 @@ import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.tuple.ITuple;
 import org.apache.storm.tuple.Values;
-
-import redis.clients.util.JedisURIHelper;
+import redis.clients.jedis.util.JedisURIHelper;
 
 /**
  * Create a Redis sink based on the URI and properties. The URI has the format 
of


Reply via email to