This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8f61e  [BAHIR-285] Redis: add srem command (#164)
cd8f61e is described below

commit cd8f61e2808d9e62650632e5e1ce5a38843ef346
Author: Hyeonho Kim <[email protected]>
AuthorDate: Wed May 31 01:09:45 2023 +0900

    [BAHIR-285] Redis: add srem command (#164)
---
 .../flink/streaming/connectors/redis/RedisSink.java     |  3 +++
 .../redis/common/container/RedisClusterContainer.java   | 13 +++++++++++++
 .../redis/common/container/RedisCommandsContainer.java  | 10 ++++++++++
 .../redis/common/container/RedisContainer.java          | 17 +++++++++++++++++
 .../connectors/redis/common/mapper/RedisCommand.java    |  5 +++++
 .../streaming/connectors/redis/RedisSinkITCase.java     | 13 ++++++++++---
 6 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 0101339..a6a8d9b 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -152,6 +152,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
             case SADD:
                 this.redisCommandsContainer.sadd(key, value);
                 break;
+            case SREM:
+                this.redisCommandsContainer.srem(key, value);
+                break;
             case SET:
                 this.redisCommandsContainer.set(key, value);
                 break;
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 5c67aae..eb79738 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -133,6 +133,19 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
         }
     }
 
+    @Override
+    public void srem(final String setName, final String value) {
+        try {
+            jedisCluster.srem(setName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SREM to set 
{} error message {}",
+                    setName, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void publish(final String channelName, final String message) {
         try {
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 9fbad93..26007f4 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -82,6 +82,16 @@ public interface RedisCommandsContainer extends Serializable 
{
      */
     void sadd(String setName, String value);
 
+
+    /**
+     * Remove the specified member from the set stored at key.
+     * Specified members that are not a member of this set are ignored.
+     * If key does not exist, an exception will be raised.
+     * @param setName
+     * @param value
+     */
+    void srem(String setName, String value);
+
     /**
      * Posts a message to the given channel.
      *
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 955fa31..5857d25 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -193,6 +193,23 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
         }
     }
 
+    @Override
+    public void srem(final String setName, final String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.srem(setName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SREM to set 
{} error message {}",
+                    setName, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
     @Override
     public void publish(final String channelName, final String message) {
         Jedis jedis = null;
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 1e48e7f..3bf4991 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -39,6 +39,11 @@ public enum RedisCommand {
      */
     SADD(RedisDataType.SET),
 
+    /**
+     * Remove the specified members from the set stored at key.
+     */
+    SREM(RedisDataType.SET),
+
     /**
      * Set key to hold the string value. If key already holds a value,
      * it is overwritten, regardless of its type.
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 72fdf09..70dfe03 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -77,14 +77,21 @@ public class RedisSinkITCase extends RedisITCaseBase {
     @Test
     public void testRedisSetDataType() throws Exception {
         DataStreamSource<Tuple2<String, String>> source = env.addSource(new 
TestSourceFunction());
-        RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+        RedisSink<Tuple2<String, String>> redisSaddSink = new 
RedisSink<>(jedisPoolConfig,
             new RedisCommandMapper(RedisCommand.SADD));
 
-        source.addSink(redisSink);
-        env.execute("Test Redis Set Data Type");
+        source.addSink(redisSaddSink);
+        env.execute("Test SADD");
 
         assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY));
 
+        RedisSink<Tuple2<String, String>> redisSremSink = new 
RedisSink<>(jedisPoolConfig,
+                new RedisCommandMapper(RedisCommand.SREM));
+        source.addSink(redisSremSink);
+        env.execute("Test SREM");
+
+        assertEquals(ZERO.longValue(), jedis.scard(REDIS_KEY));
+
         jedis.del(REDIS_KEY);
     }
 

Reply via email to