This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 405f7d6f9 [Feature][Connector-V2] Add redis source connector (#2569)
405f7d6f9 is described below

commit 405f7d6f998e488b2c26fbbf357769f5d579e069
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 31 23:10:19 2022 +0800

    [Feature][Connector-V2] Add redis source connector (#2569)
---
 docs/en/connector-v2/source/Redis.md               | 149 +++++++++++++++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |   7 +
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 seatunnel-connectors-v2/connector-redis/pom.xml    |  53 ++++++++
 .../seatunnel/redis/config/RedisConfig.java        |  27 ++++
 .../seatunnel/redis/config/RedisDataType.java      |  67 +++++++++
 .../seatunnel/redis/config/RedisParameters.java    |  53 ++++++++
 .../seatunnel/redis/source/RedisSource.java        |  99 ++++++++++++++
 .../seatunnel/redis/source/RedisSourceReader.java  |  79 +++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../seatunnel/e2e/flink/v2/redis/RedisIT.java      |  89 ++++++++++++
 .../src/test/resources/redis/redis_source.conf     |  65 +++++++++
 .../seatunnel/e2e/spark/v2/redis/RedisIT.java      |  89 ++++++++++++
 .../src/test/resources/redis/redis_source.conf     |  68 ++++++++++
 tools/dependencies/known-dependencies.txt          |   4 +-
 16 files changed, 855 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
new file mode 100644
index 000000000..62f4abd93
--- /dev/null
+++ b/docs/en/connector-v2/source/Redis.md
@@ -0,0 +1,149 @@
+# Redis
+
+> Redis source connector
+
+## Description
+
+Used to read data from Redis. Only support batch mode.
+
+##  Options
+
+| name      | type   | required | default value |
+|-----------|--------|----------|---------------|
+| host      | string | yes      | -             |
+| port      | int    | yes      | -             |
+| keys      | string | yes      | -             |
+| data_type | string | yes      | -             |
+| auth      | string | No       | -             |
+| schema    | config | No       | -             |
+| format    | string | No       | json          |
+
+### host [string]
+
+redis host
+
+### port [int]
+
+redis port
+
+### keys [string]
+
+keys pattern
+
+**Tips:Redis source connector support fuzzy key matching, user needs to ensure 
that the matched keys are the same type**
+
+### data_type [string]
+
+redis data types, support `key` `hash` `list` `set` `zset`
+
+- key
+> The value of each key will be sent downstream as a single row of data.
+> For example, the value of key is `SeaTunnel test message`, the data received 
downstream is `SeaTunnel test message` and only one message will be received.
+
+
+- hash
+> The hash key-value pairs will be formatted as json to be sent downstream as 
a single row of data.
+> For example, the value of hash is `name:tyrantlucifer age:26`, the data 
received downstream is `{"name":"tyrantlucifer", "age":"26"}` and only one 
message will be received.
+
+- list
+> Each element in the list will be sent downstream as a single row of data.
+> For example, the value of list is `[tyrantlucier, CalvinKirs]`, the data 
received downstream are `tyrantlucifer` and `CalvinKirs` and only two message 
will be received.
+
+- set
+> Each element in the set will be sent downstream as a single row of data
+> For example, the value of set is `[tyrantlucier, CalvinKirs]`, the data 
received downstream are `tyrantlucifer` and `CalvinKirs` and only two message 
will be received.
+
+- zset
+> Each element in the sorted set will be sent downstream as a single row of 
data
+> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the 
data received downstream are `tyrantlucifer` and `CalvinKirs` and only two 
message will be received.
+
+### auth [String]
+
+redis authentication password, you need it when you connect to an encrypted 
cluster
+
+### format [String]
+
+the format of upstream data, now only support `json` `text`, default `json`.
+
+when you assign format is `json`, you should also assign schema option, for 
example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+when you assign format is `text`, connector will do nothing for upstream data, 
for example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+connector will generate data as the following:
+
+| content |
+|---------|
+| {"code":  200, "data":  "get success", "success":  true}        |
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data
+
+## Example
+
+simple:
+
+```hocon
+  Redis {
+    host = localhost
+    port = 6379
+    keys = "key_test*"
+    data_type = key
+    format = text
+  }
+```
+
+```hocon
+  Redis {
+    host = localhost
+    port = 6379
+    keys = "key_test*"
+    data_type = key
+    format = json
+    schema {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7f06da42d..7eecae293 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -121,5 +121,6 @@ seatunnel.sink.IoTDB = connector-iotdb
 seatunnel.sink.Neo4j = connector-neo4j
 seatunnel.sink.FtpFile = connector-file-ftp
 seatunnel.sink.Socket = connector-socket
+seatunnel.source.Redis = connector-redis
 seatunnel.sink.DataHub = connector-datahub
 
diff --git a/pom.xml b/pom.xml
index 6982068de..5c37c7b84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -231,6 +231,7 @@
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
         <awaitility.version>4.2.0</awaitility.version>
         <neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
+        <jedis.version>4.2.2</jedis.version>
         <datahub.version>2.19.0-public</datahub.version>
     </properties>
 
@@ -592,6 +593,12 @@
                 <version>${sshd.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>redis.clients</groupId>
+                <artifactId>jedis</artifactId>
+                <version>${jedis.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.junit.jupiter</groupId>
                 <artifactId>junit-jupiter-engine</artifactId>
diff --git a/seatunnel-connectors-v2-dist/pom.xml 
b/seatunnel-connectors-v2-dist/pom.xml
index ad63551c2..ce13e8adb 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -146,6 +146,11 @@
             <artifactId>connector-neo4j</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-redis</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-datahub</artifactId>
diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml 
b/seatunnel-connectors-v2/connector-redis/pom.xml
new file mode 100644
index 000000000..ffabf615f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-redis</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
new file mode 100644
index 000000000..5c3dca239
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+public class RedisConfig {
+    public static final String HOST = "host";
+    public static final String PORT = "port";
+    public static final String AUTH = "auth";
+    public static final String KEY_PATTERN = "keys";
+    public static final String DATA_TYPE = "data_type";
+    public static final String FORMAT = "format";
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
new file mode 100644
index 000000000..b48540a35
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public enum RedisDataType {
+    KEY {
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            return Collections.singletonList(jedis.get(key));
+        }
+    },
+    HASH {
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            Map<String, String> kvMap = jedis.hgetAll(key);
+            return Collections.singletonList(JsonUtils.toJsonString(kvMap));
+        }
+    },
+    LIST {
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            return jedis.lrange(key, 0, -1);
+        }
+    },
+    SET {
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            Set<String> members = jedis.smembers(key);
+            return new ArrayList<>(members);
+        }
+    },
+    ZSET {
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            return jedis.zrange(key, 0, -1);
+        }
+    };
+
+    public List<String> get(Jedis jedis, String key) {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
new file mode 100644
index 000000000..d8276b8c2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class RedisParameters implements Serializable {
+    private String host;
+    private int port;
+    private String auth = "";
+    private String keysPattern;
+    private RedisDataType redisDataType;
+
+    public void buildWithConfig(Config config) {
+        // set host
+        this.host = config.getString(RedisConfig.HOST);
+        // set port
+        this.port = config.getInt(RedisConfig.PORT);
+        // set auth
+        if (config.hasPath(RedisConfig.AUTH)) {
+            this.auth = config.getString(RedisConfig.AUTH);
+        }
+        // set keysPattern
+        this.keysPattern = config.getString(RedisConfig.KEY_PATTERN);
+        // set redis data type
+        try {
+            String dataType = config.getString(RedisConfig.DATA_TYPE);
+            this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new RuntimeException("Redis source connector only support 
these data types [key, hash, list, set, zset]", e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
new file mode 100644
index 000000000..d860e6af5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+    private final RedisParameters redisParameters = new RedisParameters();
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType seaTunnelRowType;
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    @Override
+    public String getPluginName() {
+        return "Redis";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY_PATTERN, 
RedisConfig.DATA_TYPE);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        this.redisParameters.buildWithConfig(pluginConfig);
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+            Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+            this.seaTunnelRowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+        } else {
+            this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
+        }
+        // TODO: use format SPI
+        // default use json format
+        String format;
+        if (pluginConfig.hasPath(RedisConfig.FORMAT)) {
+            format = pluginConfig.getString(RedisConfig.FORMAT);
+            this.deserializationSchema = null;
+        } else {
+            format = "json";
+            this.deserializationSchema = new JsonDeserializationSchema(false, 
false, seaTunnelRowType);
+        }
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
+        return new RedisSourceReader(redisParameters, readerContext, 
deserializationSchema);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
new file mode 100644
index 000000000..5a8657097
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.lang3.StringUtils;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> 
{
+    private final RedisParameters redisParameters;
+    private final SingleSplitReaderContext context;
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private Jedis jedis;
+
+    public RedisSourceReader(RedisParameters redisParameters, 
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> 
deserializationSchema) {
+        this.redisParameters = redisParameters;
+        this.context = context;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.jedis = new Jedis(redisParameters.getHost(), 
redisParameters.getPort());
+        if (StringUtils.isNotBlank(redisParameters.getAuth())) {
+            this.jedis.auth(redisParameters.getAuth());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (Objects.nonNull(jedis)) {
+            jedis.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        Set<String> keys = jedis.keys(redisParameters.getKeysPattern());
+        RedisDataType redisDataType = redisParameters.getRedisDataType();
+        for (String key : keys) {
+            List<String> values = redisDataType.get(jedis, key);
+            for (String value : values) {
+                if (deserializationSchema == null) {
+                    output.collect(new SeaTunnelRow(new Object[]{value}));
+                } else {
+                    deserializationSchema.deserialize(value.getBytes(), 
output);
+                }
+            }
+        }
+        context.signalNoMoreElement();
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 3d2f3ea7a..a179c7ab6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -50,6 +50,7 @@
         <module>connector-elasticsearch</module>
         <module>connector-iotdb</module>
         <module>connector-neo4j</module>
+        <module>connector-redis</module>
         <module>connector-datahub</module>
     </modules>
 
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
new file mode 100644
index 000000000..554cf0752
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.redis;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisIT extends FlinkContainer {
+    private static final String REDIS_IMAGE = "redis:latest";
+    private static final String REDIS_CONTAINER_HOST = "flink_e2e_redis";
+    private static final String REDIS_HOST = "localhost";
+    private static final int REDIS_PORT = 6379;
+    private GenericContainer<?> redisContainer;
+    private Jedis jedis;
+
+    @BeforeEach
+    public void startRedisContainer() {
+        redisContainer = new GenericContainer<>(REDIS_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(REDIS_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        
redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
REDIS_PORT, REDIS_PORT)));
+        Startables.deepStart(Stream.of(redisContainer)).join();
+        log.info("Redis container started");
+        given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initJedis);
+        this.generateTestData();
+    }
+
+    private void initJedis() {
+        jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+    }
+
+    private void generateTestData() {
+        jedis.set("key_test", "test");
+        jedis.set("key_test1", "test1");
+        jedis.set("key_test2", "test2");
+        jedis.set("key_test3", "test3");
+        jedis.set("key_test4", "test4");
+    }
+
+    @Test
+    public void testRedisSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/redis/redis_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @AfterEach
+    public void close() {
+        super.close();
+        jedis.close();
+        redisContainer.close();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
new file mode 100644
index 000000000..9805b2a1f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Redis {
+      host = flink_e2e_redis
+      port = 6379
+      keys = "key_test*"
+      data_type = key
+      format = text
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis
+}
+
+transform {
+
+}
+
+sink {
+
+  Console {
+
+  }
+
+  Assert {
+    rules = [
+      {
+        field_name = content
+        field_type = string
+        field_value = [
+          {
+            rule_type = NOT_NULL
+          }
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
new file mode 100644
index 000000000..5fcf05f58
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.redis;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisIT extends SparkContainer {
+    private static final String REDIS_IMAGE = "redis:latest";
+    private static final String REDIS_CONTAINER_HOST = "spark_e2e_redis";
+    private static final String REDIS_HOST = "localhost";
+    private static final int REDIS_PORT = 6379;
+    private GenericContainer<?> redisContainer;
+    private Jedis jedis;
+
+    @BeforeEach
+    public void startRedisContainer() {
+        redisContainer = new GenericContainer<>(REDIS_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(REDIS_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        
redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
REDIS_PORT, REDIS_PORT)));
+        Startables.deepStart(Stream.of(redisContainer)).join();
+        log.info("Redis container started");
+        given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initJedis);
+        this.generateTestData();
+    }
+
+    private void initJedis() {
+        jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+    }
+
+    private void generateTestData() {
+        jedis.set("key_test", "test");
+        jedis.set("key_test1", "test1");
+        jedis.set("key_test2", "test2");
+        jedis.set("key_test3", "test3");
+        jedis.set("key_test4", "test4");
+    }
+
+    @Test
+    public void testRedisSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/redis/redis_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @AfterEach
+    public void close() {
+        super.close();
+        jedis.close();
+        redisContainer.close();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
new file mode 100644
index 000000000..25ea6e009
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    Redis {
+        host = spark_e2e_redis
+        port = 6379
+        keys = "key_test*"
+        data_type = key
+        format = text
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis
+}
+
+transform {
+
+}
+
+sink {
+
+    Console {
+
+    }
+
+    Assert {
+      rules = [
+        {
+          field_name = content
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index dad98d576..8f578303e 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -168,6 +168,7 @@ google-http-client-1.26.0.jar
 google-http-client-jackson2-1.26.0.jar
 google-oauth-client-1.26.0.jar
 gson-2.2.4.jar
+gson-2.8.9.jar
 guava-19.0.jar
 guice-3.0.jar
 guice-4.1.0.jar
@@ -348,7 +349,7 @@ jcodings-1.0.18.jar
 jcodings-1.0.43.jar
 jcommander-1.81.jar
 jdbi-2.63.1.jar
-jedis-3.2.0.jar
+jedis-4.2.2.jar
 jersey-client-1.19.jar
 jersey-client-1.9.jar
 jersey-client-2.22.2.jar
@@ -421,6 +422,7 @@ joni-2.1.27.jar
 jopt-simple-5.0.2.jar
 jpam-1.1.jar
 jsch-0.1.54.jar
+json-20211205.jar
 json-path-2.3.0.jar
 json-smart-2.3.jar
 jsoup-1.14.3.jar

Reply via email to