This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03265df  [BAHIR-155] TTL to HSET and SETEX command (#66)
03265df is described below

commit 03265df9b1ad813ba298913497c6001be423087a
Author: Artur Magalhães <[email protected]>
AuthorDate: Fri Oct 11 11:49:32 2019 -0300

    [BAHIR-155] TTL to HSET and SETEX command (#66)
    
    - Adds Possibility to include TTL to a HASH in HSET operation.
    - Adds SETEX command.
---
 .../streaming/connectors/redis/RedisSink.java      |  19 ++-
 .../common/container/RedisClusterContainer.java    |  18 ++-
 .../common/container/RedisCommandsContainer.java   |  17 ++-
 .../redis/common/container/RedisContainer.java     |  22 ++-
 .../redis/common/mapper/RedisCommand.java          |   6 +
 .../common/mapper/RedisCommandDescription.java     |  66 +++++++--
 .../redis/common/mapper/RedisMapper.java           |  11 ++
 .../connectors/redis/RedisSinkITCase.java          | 148 +++++++++++++++++++++
 .../mapper/RedisDataTypeDescriptionTest.java       |   6 +
 9 files changed, 297 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e468772..e6fb355 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -92,6 +92,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
      * {@code additionalKey} used as set name for {@link 
RedisDataType#SORTED_SET}
      */
     private String additionalKey;
+
+    /**
+     * This additional time to live is optional for {@link RedisDataType#HASH} 
and required for {@link RedisCommand#SETEX}.
+     * It sets the TTL for a specific key.
+     */
+    private Integer additionalTTL;
+
     private RedisMapper<IN> redisSinkMapper;
     private RedisCommand redisCommand;
 
@@ -113,7 +120,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
 
         this.redisSinkMapper = redisSinkMapper;
         RedisCommandDescription redisCommandDescription = 
redisSinkMapper.getCommandDescription();
+
         this.redisCommand = redisCommandDescription.getCommand();
+        this.additionalTTL = redisCommandDescription.getAdditionalTTL();
         this.additionalKey = redisCommandDescription.getAdditionalKey();
     }
 
@@ -121,7 +130,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
      * Called when new data arrives to the sink, and forwards it to Redis 
channel.
      * Depending on the specified Redis data type (see {@link RedisDataType}),
      * a different Redis command will be applied.
-     * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, 
ZADD.
+     * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, 
HSET, ZADD.
      *
      * @param input The incoming data
      */
@@ -129,7 +138,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
     public void invoke(IN input, Context context) throws Exception {
         String key = redisSinkMapper.getKeyFromData(input);
         String value = redisSinkMapper.getValueFromData(input);
+
         Optional<String> optAdditionalKey = 
redisSinkMapper.getAdditionalKey(input);
+        Optional<Integer> optAdditionalTTL = 
redisSinkMapper.getAdditionalTTL(input);
 
         switch (redisCommand) {
             case RPUSH:
@@ -144,6 +155,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
             case SET:
                 this.redisCommandsContainer.set(key, value);
                 break;
+            case SETEX:
+                this.redisCommandsContainer.setex(key, value, 
optAdditionalTTL.orElse(this.additionalTTL));
+                break;
             case PFADD:
                 this.redisCommandsContainer.pfadd(key, value);
                 break;
@@ -157,7 +171,8 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
                 
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), 
key);
                 break;
             case HSET:
-                
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), 
key, value);
+                
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), 
key, value,
+                        optAdditionalTTL.orElse(this.additionalTTL));
                 break;
             default:
                 throw new IllegalArgumentException("Cannot process such data 
type: " + redisCommand);
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 8b18578..886b94f 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -57,9 +57,12 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
     }
 
     @Override
-    public void hset(final String key, final String hashField, final String 
value) {
+    public void hset(final String key, final String hashField, final String 
value, final Integer ttl) {
         try {
             jedisCluster.hset(key, hashField, value);
+            if (ttl != null) {
+                jedisCluster.expire(key, ttl);
+            }
         } catch (Exception e) {
             if (LOG.isErrorEnabled()) {
                 LOG.error("Cannot send Redis message with command HSET to hash 
{} of key {} error message {}",
@@ -135,6 +138,19 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
     }
 
     @Override
+    public void setex(final String key, final String value, final Integer ttl) 
{
+        try {
+            jedisCluster.setex(key, ttl, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETEX to key 
{} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
     public void pfadd(final String key, final String element) {
         try {
             jedisCluster.pfadd(key, element);
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 5d7993c..486784b 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -32,15 +32,17 @@ public interface RedisCommandsContainer extends 
Serializable {
     void open() throws Exception;
 
     /**
-     * Sets field in the hash stored at key to value.
+     * Sets field in the hash stored at key to value, with TTL, if needed.
+     * Setting expire time to key is optional.
      * If key does not exist, a new key holding a hash is created.
      * If field already exists in the hash, it is overwritten.
      *
      * @param key Hash name
      * @param hashField Hash field
      * @param value Hash value
+     * @param ttl Hash expire time
      */
-    void hset(String key, String hashField, String value);
+    void hset(String key, String hashField, String value, Integer ttl);
 
     /**
      * Insert the specified value at the tail of the list stored at key.
@@ -89,6 +91,17 @@ public interface RedisCommandsContainer extends Serializable 
{
     void set(String key, String value);
 
     /**
+     * Set key to hold the string value, with a time to live (TTL). If key 
already holds a value,
+     * it is overwritten, regardless of its type. Any previous time to live 
associated with the key is
+     * reset on successful SETEX operation.
+     *
+     * @param key the key name in which value to be set
+     * @param value the value
+     * @param ttl time to live (TTL)
+     */
+    void setex(String key, String value, Integer ttl);
+
+    /**
      * Adds all the element arguments to the HyperLogLog data structure
      * stored at the variable name specified as first argument.
      *
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index b862ea4..4af84d1 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -86,11 +86,14 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
     }
 
     @Override
-    public void hset(final String key, final String hashField, final String 
value) {
+    public void hset(final String key, final String hashField, final String 
value, final Integer ttl) {
         Jedis jedis = null;
         try {
             jedis = getInstance();
             jedis.hset(key, hashField, value);
+            if (ttl != null) {
+                jedis.expire(key, ttl);
+            }
         } catch (Exception e) {
             if (LOG.isErrorEnabled()) {
                 LOG.error("Cannot send Redis message with command HSET to key 
{} and hashField {} error message {}",
@@ -188,6 +191,23 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
     }
 
     @Override
+    public void setex(final String key, final String value, final Integer ttl) 
{
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.setex(key, ttl, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETEX to key 
{} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
     public void pfadd(final String key, final String element) {
         Jedis jedis = null;
         try {
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 019ad46..d465e83 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -46,6 +46,12 @@ public enum RedisCommand {
     SET(RedisDataType.STRING),
 
     /**
+     * Set key to hold the string value, with a time to live (TTL). If key 
already holds a value,
+     * it is overwritten, regardless of its type.
+     */
+    SETEX(RedisDataType.STRING),
+
+    /**
      * Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
      */
     PFADD(RedisDataType.HYPER_LOG_LOG),
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 6ab329f..3284361 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -25,7 +25,9 @@ import java.util.Objects;
  * you need to use first constructor {@link 
#RedisCommandDescription(RedisCommand, String)}.
  * If the {@code additionalKey} is {@code null} it will throw {@code 
IllegalArgumentException}
  *
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}
+ * If command is {@link RedisCommand#SETEX}, its required to use TTL. The 
proper constructor is {@link #RedisCommandDescription(RedisCommand, Integer)}.
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}, also not {@link RedisCommand#SETEX},
  * you can use second constructor {@link 
#RedisCommandDescription(RedisCommand)}
  */
 public class RedisCommandDescription implements Serializable {
@@ -38,7 +40,7 @@ public class RedisCommandDescription implements Serializable {
      * This additional key is needed for the group {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}.
      * Other {@link RedisDataType} works only with two variable i.e. name of 
the list and value to be added.
      * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} 
we need three variables.
-     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element. Its possible to use TTL.
      * {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
      * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element 
and it's score.
      * {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
@@ -46,15 +48,28 @@ public class RedisCommandDescription implements 
Serializable {
     private String additionalKey;
 
     /**
-     * Use this constructor when data type is {@link RedisDataType#HASH} or 
{@link RedisDataType#SORTED_SET}.
-     * If different data type is specified, {@code additionalKey} is ignored.
+     * This additional key is optional for the group {@link 
RedisDataType#HASH}, required for {@link RedisCommand#SETEX}.
+     * For the other types and commands, its not used.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element. Its possible to use TTL.
+     * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link 
RedisDataType#HASH}
+     * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live 
(TTL).
+     */
+    private Integer additionalTTL;
+
+    /**
+     * Default constructor for {@link RedisCommandDescription}.
+     * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} 
data types, {@code additionalKey} is required.
+     * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is 
required.
+     * In both cases, if the respective variables are not provided, it throws 
an {@link IllegalArgumentException}
      * @param redisCommand the redis command type {@link RedisCommand}
-     * @param additionalKey additional key for Hash and Sorted set data type
+     * @param additionalKey additional key for Hash data type
+     * @param additionalTTL additional TTL optional for Hash data type
      */
-    public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey) {
+    public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey, Integer additionalTTL) {
         Objects.requireNonNull(redisCommand, "Redis command type can not be 
null");
         this.redisCommand = redisCommand;
         this.additionalKey = additionalKey;
+        this.additionalTTL = additionalTTL;
 
         if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
             redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
@@ -62,6 +77,32 @@ public class RedisCommandDescription implements Serializable 
{
                 throw new IllegalArgumentException("Hash and Sorted Set should 
have additional key");
             }
         }
+
+        if (redisCommand.equals(RedisCommand.SETEX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("SETEX command should have 
time to live (TTL)");
+            }
+        }
+    }
+
+    /**
+     * Use this constructor when data type is {@link RedisDataType#HASH} 
(without TTL) or {@link RedisDataType#SORTED_SET}.
+     * If different data type is specified, {@code additionalKey} is ignored.
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash and Sorted set data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey) {
+        this(redisCommand, additionalKey, null);
+    }
+
+    /**
+     * Use this constructor when using SETEX command {@link 
RedisDataType#STRING}.
+     * This command requires a TTL. Throws {@link IllegalArgumentException} if 
it is null.
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalTTL additional TTL required for SETEX command
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, Integer 
additionalTTL) {
+        this(redisCommand, null, additionalTTL);
     }
 
     /**
@@ -70,7 +111,7 @@ public class RedisCommandDescription implements Serializable 
{
      * @param redisCommand the redis data type {@link RedisCommand}
      */
     public RedisCommandDescription(RedisCommand redisCommand) {
-        this(redisCommand, null);
+        this(redisCommand, null, null);
     }
 
     /**
@@ -87,7 +128,12 @@ public class RedisCommandDescription implements 
Serializable {
      *
      * @return the additional key
      */
-    public String getAdditionalKey() {
-        return additionalKey;
-    }
+    public String getAdditionalKey() { return additionalKey; }
+
+    /**
+     * Returns the additional time to live (TTL) if data type is {@link 
RedisDataType#HASH}.
+     *
+     * @return the additional TTL
+     */
+    public Integer getAdditionalTTL() { return additionalTTL; }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index 96df75e..1481d7d 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -75,4 +75,15 @@ public interface RedisMapper<T> extends Function, 
Serializable {
     default Optional<String> getAdditionalKey(T data) {
         return Optional.empty();
     }
+
+    /**
+     * Extracts the additional time to live (TTL) for data as an {@link 
Optional<Integer>}.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<Integer> getAdditionalTTL(T data) {
+        return Optional.empty();
+    }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 47544f7..ee1cc7f 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -29,6 +29,8 @@ import org.junit.Before;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
+import java.util.Optional;
+
 import static org.junit.Assert.assertEquals;
 
 public class RedisSinkITCase extends RedisITCaseBase {
@@ -36,8 +38,13 @@ public class RedisSinkITCase extends RedisITCaseBase {
     private FlinkJedisPoolConfig jedisPoolConfig;
     private static final Long NUM_ELEMENTS = 20L;
     private static final Long ZERO = 0L;
+    private static final Long REDIS_NOT_ASSOCIATED_EXPIRE_FLAG = -1L;
+    private static final Long REDIS_KEY_NOT_EXISTS_FLAG = -2L;
+    private static final Long REDIS_TTL_IN_SECS = 1L;
     private static final String REDIS_KEY = "TEST_KEY";
     private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+    private static final String TEST_MESSAGE = "TEST_MESSAGE";
+    private static final Long TEST_MESSAGE_LENGTH = (long) 
TEST_MESSAGE.length();
 
     StreamExecutionEnvironment env;
 
@@ -82,6 +89,21 @@ public class RedisSinkITCase extends RedisITCaseBase {
     }
 
     @Test
+    public void testRedisStringDataTypeWithTTL() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new 
TestSourceFunctionString());
+        RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                new RedisCommandMapperWithTTL(RedisCommand.SETEX));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Set Data Type With TTL");
+
+        assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+
+        jedis.del(REDIS_KEY);
+    }
+
+    @Test
     public void testRedisHyperLogLogDataType() throws Exception {
         DataStreamSource<Tuple2<String, String>> source = env.addSource(new 
TestSourceFunction());
         RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
@@ -127,6 +149,37 @@ public class RedisSinkITCase extends RedisITCaseBase {
         env.execute("Test Redis Hash Data Type");
 
         assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, 
jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @Test
+    public void testRedisHashDataTypeWithTTL() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new 
TestSourceFunctionHash());
+        RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                new RedisAdditionalTTLMapper(RedisCommand.HSET));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hash Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @Test
+    public void testRedisHashDataTypeWithTTLFromOpt() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new 
TestSourceFunctionHash());
+        RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                new RedisAdditionalTTLMapperFromOpt(RedisCommand.HSET));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hash Data Type 2");
+
+        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -156,6 +209,24 @@ public class RedisSinkITCase extends RedisITCaseBase {
         }
     }
 
+    private static class TestSourceFunctionString implements 
SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws 
Exception {
+            if (running) {
+                ctx.collect(new Tuple2<>(REDIS_KEY, TEST_MESSAGE));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
     private static class TestSourceFunctionHash implements 
SourceFunction<Tuple2<String, String>> {
         private static final long serialVersionUID = 1L;
 
@@ -216,6 +287,30 @@ public class RedisSinkITCase extends RedisITCaseBase {
         }
     }
 
+    public static class RedisCommandMapperWithTTL implements 
RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisCommandMapperWithTTL(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, 
REDIS_TTL_IN_SECS.intValue());
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+
     public static class RedisAdditionalDataMapper implements 
RedisMapper<Tuple2<String, String>> {
 
         private RedisCommand redisCommand;
@@ -239,4 +334,57 @@ public class RedisSinkITCase extends RedisITCaseBase {
             return data.f1;
         }
     }
+
+    public static class RedisAdditionalTTLMapper implements 
RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisAdditionalTTLMapper(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY, REDIS_TTL_IN_SECS.intValue());
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+
+    public static class RedisAdditionalTTLMapperFromOpt implements 
RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisAdditionalTTLMapperFromOpt(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY, null);
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+
+        @Override
+        public Optional<Integer> getAdditionalTTL(Tuple2<String, String> data) 
{
+            return Optional.of(REDIS_TTL_IN_SECS.intValue());
+        }
+    }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
index 4af0c14..d268c95 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -31,6 +31,12 @@ public class RedisDataTypeDescriptionTest extends TestLogger 
{
         redisCommandMapper.getCommandDescription();
     }
 
+    @Test(expected=IllegalArgumentException.class)
+    public void 
shouldThrowExceptionIfAdditionalTTLIsNotGivenForStringDataTypeWithTTL(){
+        RedisSinkITCase.RedisCommandMapper redisCommandMapper = new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.SETEX);
+        redisCommandMapper.getCommandDescription();
+    }
+
     @Test
     public void shouldReturnNullForAdditionalDataType(){
         RedisSinkITCase.RedisCommandMapper redisCommandMapper = new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);

Reply via email to