This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 03265df [BAHIR-155] TTL to HSET and SETEX command (#66)
03265df is described below
commit 03265df9b1ad813ba298913497c6001be423087a
Author: Artur Magalhães <[email protected]>
AuthorDate: Fri Oct 11 11:49:32 2019 -0300
[BAHIR-155] TTL to HSET and SETEX command (#66)
- Adds Possibility to include TTL to a HASH in HSET operation.
- Adds SETEX command.
---
.../streaming/connectors/redis/RedisSink.java | 19 ++-
.../common/container/RedisClusterContainer.java | 18 ++-
.../common/container/RedisCommandsContainer.java | 17 ++-
.../redis/common/container/RedisContainer.java | 22 ++-
.../redis/common/mapper/RedisCommand.java | 6 +
.../common/mapper/RedisCommandDescription.java | 66 +++++++--
.../redis/common/mapper/RedisMapper.java | 11 ++
.../connectors/redis/RedisSinkITCase.java | 148 +++++++++++++++++++++
.../mapper/RedisDataTypeDescriptionTest.java | 6 +
9 files changed, 297 insertions(+), 16 deletions(-)
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e468772..e6fb355 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -92,6 +92,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
* {@code additionalKey} used as set name for {@link
RedisDataType#SORTED_SET}
*/
private String additionalKey;
+
+ /**
+ * This additional time to live is optional for {@link RedisDataType#HASH}
and required for {@link RedisCommand#SETEX}.
+ * It sets the TTL for a specific key.
+ */
+ private Integer additionalTTL;
+
private RedisMapper<IN> redisSinkMapper;
private RedisCommand redisCommand;
@@ -113,7 +120,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription =
redisSinkMapper.getCommandDescription();
+
this.redisCommand = redisCommandDescription.getCommand();
+ this.additionalTTL = redisCommandDescription.getAdditionalTTL();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
@@ -121,7 +130,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
* Called when new data arrives to the sink, and forwards it to Redis
channel.
* Depending on the specified Redis data type (see {@link RedisDataType}),
* a different Redis command will be applied.
- * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET,
ZADD.
+ * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD,
HSET, ZADD.
*
* @param input The incoming data
*/
@@ -129,7 +138,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
+
Optional<String> optAdditionalKey =
redisSinkMapper.getAdditionalKey(input);
+ Optional<Integer> optAdditionalTTL =
redisSinkMapper.getAdditionalTTL(input);
switch (redisCommand) {
case RPUSH:
@@ -144,6 +155,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
case SET:
this.redisCommandsContainer.set(key, value);
break;
+ case SETEX:
+ this.redisCommandsContainer.setex(key, value,
optAdditionalTTL.orElse(this.additionalTTL));
+ break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
@@ -157,7 +171,8 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey),
key);
break;
case HSET:
-
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey),
key, value);
+
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey),
key, value,
+ optAdditionalTTL.orElse(this.additionalTTL));
break;
default:
throw new IllegalArgumentException("Cannot process such data
type: " + redisCommand);
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 8b18578..886b94f 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -57,9 +57,12 @@ public class RedisClusterContainer implements
RedisCommandsContainer, Closeable
}
@Override
- public void hset(final String key, final String hashField, final String
value) {
+ public void hset(final String key, final String hashField, final String
value, final Integer ttl) {
try {
jedisCluster.hset(key, hashField, value);
+ if (ttl != null) {
+ jedisCluster.expire(key, ttl);
+ }
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command HSET to hash
{} of key {} error message {}",
@@ -135,6 +138,19 @@ public class RedisClusterContainer implements
RedisCommandsContainer, Closeable
}
@Override
+ public void setex(final String key, final String value, final Integer ttl)
{
+ try {
+ jedisCluster.setex(key, ttl, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SETEX to key
{} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public void pfadd(final String key, final String element) {
try {
jedisCluster.pfadd(key, element);
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 5d7993c..486784b 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -32,15 +32,17 @@ public interface RedisCommandsContainer extends
Serializable {
void open() throws Exception;
/**
- * Sets field in the hash stored at key to value.
+ * Sets field in the hash stored at key to value, with TTL, if needed.
+ * Setting expire time to key is optional.
* If key does not exist, a new key holding a hash is created.
* If field already exists in the hash, it is overwritten.
*
* @param key Hash name
* @param hashField Hash field
* @param value Hash value
+ * @param ttl Hash expire time
*/
- void hset(String key, String hashField, String value);
+ void hset(String key, String hashField, String value, Integer ttl);
/**
* Insert the specified value at the tail of the list stored at key.
@@ -89,6 +91,17 @@ public interface RedisCommandsContainer extends Serializable
{
void set(String key, String value);
/**
+ * Set key to hold the string value, with a time to live (TTL). If key
already holds a value,
+ * it is overwritten, regardless of its type. Any previous time to live
associated with the key is
+ * reset on successful SETEX operation.
+ *
+ * @param key the key name in which value to be set
+ * @param value the value
+ * @param ttl time to live (TTL)
+ */
+ void setex(String key, String value, Integer ttl);
+
+ /**
* Adds all the element arguments to the HyperLogLog data structure
* stored at the variable name specified as first argument.
*
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index b862ea4..4af84d1 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -86,11 +86,14 @@ public class RedisContainer implements
RedisCommandsContainer, Closeable {
}
@Override
- public void hset(final String key, final String hashField, final String
value) {
+ public void hset(final String key, final String hashField, final String
value, final Integer ttl) {
Jedis jedis = null;
try {
jedis = getInstance();
jedis.hset(key, hashField, value);
+ if (ttl != null) {
+ jedis.expire(key, ttl);
+ }
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command HSET to key
{} and hashField {} error message {}",
@@ -188,6 +191,23 @@ public class RedisContainer implements
RedisCommandsContainer, Closeable {
}
@Override
+ public void setex(final String key, final String value, final Integer ttl)
{
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.setex(key, ttl, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SETEX to key
{} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
public void pfadd(final String key, final String element) {
Jedis jedis = null;
try {
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 019ad46..d465e83 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -46,6 +46,12 @@ public enum RedisCommand {
SET(RedisDataType.STRING),
/**
+ * Set key to hold the string value, with a time to live (TTL). If key
already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SETEX(RedisDataType.STRING),
+
+ /**
* Adds the element to the HyperLogLog data structure stored at the
variable name specified as first argument.
*/
PFADD(RedisDataType.HYPER_LOG_LOG),
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 6ab329f..3284361 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -25,7 +25,9 @@ import java.util.Objects;
* you need to use first constructor {@link
#RedisCommandDescription(RedisCommand, String)}.
* If the {@code additionalKey} is {@code null} it will throw {@code
IllegalArgumentException}
*
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH}
and {@link RedisDataType#SORTED_SET}
+ * If command is {@link RedisCommand#SETEX}, its required to use TTL. The
proper constructor is {@link #RedisCommandDescription(RedisCommand, Integer)}.
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH}
and {@link RedisDataType#SORTED_SET}, also not {@link RedisCommand#SETEX},
* you can use second constructor {@link
#RedisCommandDescription(RedisCommand)}
*/
public class RedisCommandDescription implements Serializable {
@@ -38,7 +40,7 @@ public class RedisCommandDescription implements Serializable {
* This additional key is needed for the group {@link RedisDataType#HASH}
and {@link RedisDataType#SORTED_SET}.
* Other {@link RedisDataType} works only with two variable i.e. name of
the list and value to be added.
* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
we need three variables.
- * <p>For {@link RedisDataType#HASH} we need hash name, hash key and
element.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and
element. Its possible to use TTL.
* {@link #getAdditionalKey()} used as hash name for {@link
RedisDataType#HASH}
* <p>For {@link RedisDataType#SORTED_SET} we need set name, the element
and it's score.
* {@link #getAdditionalKey()} used as set name for {@link
RedisDataType#SORTED_SET}
@@ -46,15 +48,28 @@ public class RedisCommandDescription implements
Serializable {
private String additionalKey;
/**
- * Use this constructor when data type is {@link RedisDataType#HASH} or
{@link RedisDataType#SORTED_SET}.
- * If different data type is specified, {@code additionalKey} is ignored.
+ * This additional key is optional for the group {@link
RedisDataType#HASH}, required for {@link RedisCommand#SETEX}.
+ * For the other types and commands, its not used.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and
element. Its possible to use TTL.
+ * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link
RedisDataType#HASH}
+ * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live
(TTL).
+ */
+ private Integer additionalTTL;
+
+ /**
+ * Default constructor for {@link RedisCommandDescription}.
+ * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
data types, {@code additionalKey} is required.
+ * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is
required.
+ * In both cases, if the respective variables are not provided, it throws
an {@link IllegalArgumentException}
* @param redisCommand the redis command type {@link RedisCommand}
- * @param additionalKey additional key for Hash and Sorted set data type
+ * @param additionalKey additional key for Hash data type
+ * @param additionalTTL additional TTL optional for Hash data type
*/
- public RedisCommandDescription(RedisCommand redisCommand, String
additionalKey) {
+ public RedisCommandDescription(RedisCommand redisCommand, String
additionalKey, Integer additionalTTL) {
Objects.requireNonNull(redisCommand, "Redis command type can not be
null");
this.redisCommand = redisCommand;
this.additionalKey = additionalKey;
+ this.additionalTTL = additionalTTL;
if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
@@ -62,6 +77,32 @@ public class RedisCommandDescription implements Serializable
{
throw new IllegalArgumentException("Hash and Sorted Set should
have additional key");
}
}
+
+ if (redisCommand.equals(RedisCommand.SETEX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("SETEX command should have
time to live (TTL)");
+ }
+ }
+ }
+
+ /**
+ * Use this constructor when data type is {@link RedisDataType#HASH}
(without TTL) or {@link RedisDataType#SORTED_SET}.
+ * If different data type is specified, {@code additionalKey} is ignored.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash and Sorted set data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String
additionalKey) {
+ this(redisCommand, additionalKey, null);
+ }
+
+ /**
+ * Use this constructor when using SETEX command {@link
RedisDataType#STRING}.
+ * This command requires a TTL. Throws {@link IllegalArgumentException} if
it is null.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalTTL additional TTL required for SETEX command
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, Integer
additionalTTL) {
+ this(redisCommand, null, additionalTTL);
}
/**
@@ -70,7 +111,7 @@ public class RedisCommandDescription implements Serializable
{
* @param redisCommand the redis data type {@link RedisCommand}
*/
public RedisCommandDescription(RedisCommand redisCommand) {
- this(redisCommand, null);
+ this(redisCommand, null, null);
}
/**
@@ -87,7 +128,12 @@ public class RedisCommandDescription implements
Serializable {
*
* @return the additional key
*/
- public String getAdditionalKey() {
- return additionalKey;
- }
+ public String getAdditionalKey() { return additionalKey; }
+
+ /**
+ * Returns the additional time to live (TTL) if data type is {@link
RedisDataType#HASH}.
+ *
+ * @return the additional TTL
+ */
+ public Integer getAdditionalTTL() { return additionalTTL; }
}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index 96df75e..1481d7d 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -75,4 +75,15 @@ public interface RedisMapper<T> extends Function,
Serializable {
default Optional<String> getAdditionalKey(T data) {
return Optional.empty();
}
+
+ /**
+ * Extracts the additional time to live (TTL) for data as an {@link
Optional<Integer>}.
+ * The default implementation returns an empty Optional.
+ *
+ * @param data
+ * @return Optional
+ */
+ default Optional<Integer> getAdditionalTTL(T data) {
+ return Optional.empty();
+ }
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 47544f7..ee1cc7f 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -29,6 +29,8 @@ import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
+import java.util.Optional;
+
import static org.junit.Assert.assertEquals;
public class RedisSinkITCase extends RedisITCaseBase {
@@ -36,8 +38,13 @@ public class RedisSinkITCase extends RedisITCaseBase {
private FlinkJedisPoolConfig jedisPoolConfig;
private static final Long NUM_ELEMENTS = 20L;
private static final Long ZERO = 0L;
+ private static final Long REDIS_NOT_ASSOCIATED_EXPIRE_FLAG = -1L;
+ private static final Long REDIS_KEY_NOT_EXISTS_FLAG = -2L;
+ private static final Long REDIS_TTL_IN_SECS = 1L;
private static final String REDIS_KEY = "TEST_KEY";
private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+ private static final String TEST_MESSAGE = "TEST_MESSAGE";
+ private static final Long TEST_MESSAGE_LENGTH = (long)
TEST_MESSAGE.length();
StreamExecutionEnvironment env;
@@ -82,6 +89,21 @@ public class RedisSinkITCase extends RedisITCaseBase {
}
@Test
+ public void testRedisStringDataTypeWithTTL() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new
TestSourceFunctionString());
+ RedisSink<Tuple2<String, String>> redisSink = new
RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapperWithTTL(RedisCommand.SETEX));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Set Data Type With TTL");
+
+ assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+
+ jedis.del(REDIS_KEY);
+ }
+
+ @Test
public void testRedisHyperLogLogDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new
TestSourceFunction());
RedisSink<Tuple2<String, String>> redisSink = new
RedisSink<>(jedisPoolConfig,
@@ -127,6 +149,37 @@ public class RedisSinkITCase extends RedisITCaseBase {
env.execute("Test Redis Hash Data Type");
assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG,
jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @Test
+ public void testRedisHashDataTypeWithTTL() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new
TestSourceFunctionHash());
+ RedisSink<Tuple2<String, String>> redisSink = new
RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalTTLMapper(RedisCommand.HSET));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hash Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @Test
+ public void testRedisHashDataTypeWithTTLFromOpt() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new
TestSourceFunctionHash());
+ RedisSink<Tuple2<String, String>> redisSink = new
RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalTTLMapperFromOpt(RedisCommand.HSET));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hash Data Type 2");
+
+ assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
@@ -156,6 +209,24 @@ public class RedisSinkITCase extends RedisITCaseBase {
}
}
+ private static class TestSourceFunctionString implements
SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws
Exception {
+ if (running) {
+ ctx.collect(new Tuple2<>(REDIS_KEY, TEST_MESSAGE));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
private static class TestSourceFunctionHash implements
SourceFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
@@ -216,6 +287,30 @@ public class RedisSinkITCase extends RedisITCaseBase {
}
}
+ public static class RedisCommandMapperWithTTL implements
RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisCommandMapperWithTTL(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand,
REDIS_TTL_IN_SECS.intValue());
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+
public static class RedisAdditionalDataMapper implements
RedisMapper<Tuple2<String, String>> {
private RedisCommand redisCommand;
@@ -239,4 +334,57 @@ public class RedisSinkITCase extends RedisITCaseBase {
return data.f1;
}
}
+
+ public static class RedisAdditionalTTLMapper implements
RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisAdditionalTTLMapper(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand,
REDIS_ADDITIONAL_KEY, REDIS_TTL_IN_SECS.intValue());
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+
+ public static class RedisAdditionalTTLMapperFromOpt implements
RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisAdditionalTTLMapperFromOpt(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand,
REDIS_ADDITIONAL_KEY, null);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+
+ @Override
+ public Optional<Integer> getAdditionalTTL(Tuple2<String, String> data)
{
+ return Optional.of(REDIS_TTL_IN_SECS.intValue());
+ }
+ }
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
index 4af0c14..d268c95 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -31,6 +31,12 @@ public class RedisDataTypeDescriptionTest extends TestLogger
{
redisCommandMapper.getCommandDescription();
}
+ @Test(expected=IllegalArgumentException.class)
+ public void
shouldThrowExceptionIfAdditionalTTLIsNotGivenForStringDataTypeWithTTL(){
+ RedisSinkITCase.RedisCommandMapper redisCommandMapper = new
RedisSinkITCase.RedisCommandMapper(RedisCommand.SETEX);
+ redisCommandMapper.getCommandDescription();
+ }
+
@Test
public void shouldReturnNullForAdditionalDataType(){
RedisSinkITCase.RedisCommandMapper redisCommandMapper = new
RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);