This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 405f7d6f9 [Feature][Connector-V2] Add redis source connector (#2569)
405f7d6f9 is described below
commit 405f7d6f998e488b2c26fbbf357769f5d579e069
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 31 23:10:19 2022 +0800
[Feature][Connector-V2] Add redis source connector (#2569)
---
docs/en/connector-v2/source/Redis.md | 149 +++++++++++++++++++++
plugin-mapping.properties | 1 +
pom.xml | 7 +
seatunnel-connectors-v2-dist/pom.xml | 5 +
seatunnel-connectors-v2/connector-redis/pom.xml | 53 ++++++++
.../seatunnel/redis/config/RedisConfig.java | 27 ++++
.../seatunnel/redis/config/RedisDataType.java | 67 +++++++++
.../seatunnel/redis/config/RedisParameters.java | 53 ++++++++
.../seatunnel/redis/source/RedisSource.java | 99 ++++++++++++++
.../seatunnel/redis/source/RedisSourceReader.java | 79 +++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
.../seatunnel/e2e/flink/v2/redis/RedisIT.java | 89 ++++++++++++
.../src/test/resources/redis/redis_source.conf | 65 +++++++++
.../seatunnel/e2e/spark/v2/redis/RedisIT.java | 89 ++++++++++++
.../src/test/resources/redis/redis_source.conf | 68 ++++++++++
tools/dependencies/known-dependencies.txt | 4 +-
16 files changed, 855 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
new file mode 100644
index 000000000..62f4abd93
--- /dev/null
+++ b/docs/en/connector-v2/source/Redis.md
@@ -0,0 +1,149 @@
+# Redis
+
+> Redis source connector
+
+## Description
+
+Used to read data from Redis. Only support batch mode.
+
+## Options
+
+| name | type | required | default value |
+|-----------|--------|----------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| keys | string | yes | - |
+| data_type | string | yes | - |
+| auth | string | No | - |
+| schema | config | No | - |
+| format | string | No | json |
+
+### host [string]
+
+redis host
+
+### port [int]
+
+redis port
+
+### keys [string]
+
+keys pattern
+
+**Tips:Redis source connector support fuzzy key matching, user needs to ensure
that the matched keys are the same type**
+
+### data_type [string]
+
+redis data types, support `key` `hash` `list` `set` `zset`
+
+- key
+> The value of each key will be sent downstream as a single row of data.
+> For example, the value of key is `SeaTunnel test message`, the data received
downstream is `SeaTunnel test message` and only one message will be received.
+
+
+- hash
+> The hash key-value pairs will be formatted as json to be sent downstream as
a single row of data.
+> For example, the value of hash is `name:tyrantlucifer age:26`, the data
received downstream is `{"name":"tyrantlucifer", "age":"26"}` and only one
message will be received.
+
+- list
+> Each element in the list will be sent downstream as a single row of data.
+> For example, the value of list is `[tyrantlucier, CalvinKirs]`, the data
received downstream are `tyrantlucifer` and `CalvinKirs` and only two message
will be received.
+
+- set
+> Each element in the set will be sent downstream as a single row of data
+> For example, the value of set is `[tyrantlucier, CalvinKirs]`, the data
received downstream are `tyrantlucifer` and `CalvinKirs` and only two message
will be received.
+
+- zset
+> Each element in the sorted set will be sent downstream as a single row of
data
+> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the
data received downstream are `tyrantlucifer` and `CalvinKirs` and only two
message will be received.
+
+### auth [String]
+
+redis authentication password, you need it when you connect to an encrypted
cluster
+
+### format [String]
+
+the format of upstream data, now only support `json` `text`, default `json`.
+
+when you assign format is `json`, you should also assign schema option, for
example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+when you assign format is `text`, connector will do nothing for upstream data,
for example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+connector will generate data as the following:
+
+| content |
+|---------|
+| {"code": 200, "data": "get success", "success": true} |
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data
+
+## Example
+
+simple:
+
+```hocon
+ Redis {
+ host = localhost
+ port = 6379
+ keys = "key_test*"
+ data_type = key
+ format = text
+ }
+```
+
+```hocon
+ Redis {
+ host = localhost
+ port = 6379
+ keys = "key_test*"
+ data_type = key
+ format = json
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ }
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7f06da42d..7eecae293 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -121,5 +121,6 @@ seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
seatunnel.sink.FtpFile = connector-file-ftp
seatunnel.sink.Socket = connector-socket
+seatunnel.source.Redis = connector-redis
seatunnel.sink.DataHub = connector-datahub
diff --git a/pom.xml b/pom.xml
index 6982068de..5c37c7b84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -231,6 +231,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<awaitility.version>4.2.0</awaitility.version>
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
+ <jedis.version>4.2.2</jedis.version>
<datahub.version>2.19.0-public</datahub.version>
</properties>
@@ -592,6 +593,12 @@
<version>${sshd.version}</version>
</dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>${jedis.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
diff --git a/seatunnel-connectors-v2-dist/pom.xml
b/seatunnel-connectors-v2-dist/pom.xml
index ad63551c2..ce13e8adb 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -146,6 +146,11 @@
<artifactId>connector-neo4j</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-redis</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-datahub</artifactId>
diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml
b/seatunnel-connectors-v2/connector-redis/pom.xml
new file mode 100644
index 000000000..ffabf615f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-redis</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
new file mode 100644
index 000000000..5c3dca239
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+public class RedisConfig {
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ public static final String AUTH = "auth";
+ public static final String KEY_PATTERN = "keys";
+ public static final String DATA_TYPE = "data_type";
+ public static final String FORMAT = "format";
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
new file mode 100644
index 000000000..b48540a35
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public enum RedisDataType {
+ KEY {
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ return Collections.singletonList(jedis.get(key));
+ }
+ },
+ HASH {
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ Map<String, String> kvMap = jedis.hgetAll(key);
+ return Collections.singletonList(JsonUtils.toJsonString(kvMap));
+ }
+ },
+ LIST {
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ return jedis.lrange(key, 0, -1);
+ }
+ },
+ SET {
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ Set<String> members = jedis.smembers(key);
+ return new ArrayList<>(members);
+ }
+ },
+ ZSET {
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ return jedis.zrange(key, 0, -1);
+ }
+ };
+
+ public List<String> get(Jedis jedis, String key) {
+ return Collections.emptyList();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
new file mode 100644
index 000000000..d8276b8c2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class RedisParameters implements Serializable {
+ private String host;
+ private int port;
+ private String auth = "";
+ private String keysPattern;
+ private RedisDataType redisDataType;
+
+ public void buildWithConfig(Config config) {
+ // set host
+ this.host = config.getString(RedisConfig.HOST);
+ // set port
+ this.port = config.getInt(RedisConfig.PORT);
+ // set auth
+ if (config.hasPath(RedisConfig.AUTH)) {
+ this.auth = config.getString(RedisConfig.AUTH);
+ }
+ // set keysPattern
+ this.keysPattern = config.getString(RedisConfig.KEY_PATTERN);
+ // set redis data type
+ try {
+ String dataType = config.getString(RedisConfig.DATA_TYPE);
+ this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("Redis source connector only support
these data types [key, hash, list, set, zset]", e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
new file mode 100644
index 000000000..d860e6af5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+ private final RedisParameters redisParameters = new RedisParameters();
+ private SeaTunnelContext seaTunnelContext;
+ private SeaTunnelRowType seaTunnelRowType;
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ @Override
+ public String getPluginName() {
+ return "Redis";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY_PATTERN,
RedisConfig.DATA_TYPE);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ this.redisParameters.buildWithConfig(pluginConfig);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ this.seaTunnelRowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ } else {
+ this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
+ }
+ // TODO: use format SPI
+ // default use json format
+ String format;
+ if (pluginConfig.hasPath(RedisConfig.FORMAT)) {
+ format = pluginConfig.getString(RedisConfig.FORMAT);
+ this.deserializationSchema = null;
+ } else {
+ format = "json";
+ this.deserializationSchema = new JsonDeserializationSchema(false,
false, seaTunnelRowType);
+ }
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
+ return new RedisSourceReader(redisParameters, readerContext,
deserializationSchema);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
new file mode 100644
index 000000000..5a8657097
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.lang3.StringUtils;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
{
+ private final RedisParameters redisParameters;
+ private final SingleSplitReaderContext context;
+ private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private Jedis jedis;
+
+ public RedisSourceReader(RedisParameters redisParameters,
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow>
deserializationSchema) {
+ this.redisParameters = redisParameters;
+ this.context = context;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.jedis = new Jedis(redisParameters.getHost(),
redisParameters.getPort());
+ if (StringUtils.isNotBlank(redisParameters.getAuth())) {
+ this.jedis.auth(redisParameters.getAuth());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (Objects.nonNull(jedis)) {
+ jedis.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ Set<String> keys = jedis.keys(redisParameters.getKeysPattern());
+ RedisDataType redisDataType = redisParameters.getRedisDataType();
+ for (String key : keys) {
+ List<String> values = redisDataType.get(jedis, key);
+ for (String value : values) {
+ if (deserializationSchema == null) {
+ output.collect(new SeaTunnelRow(new Object[]{value}));
+ } else {
+ deserializationSchema.deserialize(value.getBytes(),
output);
+ }
+ }
+ }
+ context.signalNoMoreElement();
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 3d2f3ea7a..a179c7ab6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -50,6 +50,7 @@
<module>connector-elasticsearch</module>
<module>connector-iotdb</module>
<module>connector-neo4j</module>
+ <module>connector-redis</module>
<module>connector-datahub</module>
</modules>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
new file mode 100644
index 000000000..554cf0752
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.redis;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisIT extends FlinkContainer {
+ private static final String REDIS_IMAGE = "redis:latest";
+ private static final String REDIS_CONTAINER_HOST = "flink_e2e_redis";
+ private static final String REDIS_HOST = "localhost";
+ private static final int REDIS_PORT = 6379;
+ private GenericContainer<?> redisContainer;
+ private Jedis jedis;
+
+ @BeforeEach
+ public void startRedisContainer() {
+ redisContainer = new GenericContainer<>(REDIS_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(REDIS_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s",
REDIS_PORT, REDIS_PORT)));
+ Startables.deepStart(Stream.of(redisContainer)).join();
+ log.info("Redis container started");
+ given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initJedis);
+ this.generateTestData();
+ }
+
+ private void initJedis() {
+ jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+ }
+
+ private void generateTestData() {
+ jedis.set("key_test", "test");
+ jedis.set("key_test1", "test1");
+ jedis.set("key_test2", "test2");
+ jedis.set("key_test3", "test3");
+ jedis.set("key_test4", "test4");
+ }
+
+ @Test
+ public void testRedisSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/redis/redis_source.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @AfterEach
+ public void close() {
+ super.close();
+ jedis.close();
+ redisContainer.close();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
new file mode 100644
index 000000000..9805b2a1f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/redis/redis_source.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Redis {
+ host = flink_e2e_redis
+ port = 6379
+ keys = "key_test*"
+ data_type = key
+ format = text
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis
+}
+
+transform {
+
+}
+
+sink {
+
+ Console {
+
+ }
+
+ Assert {
+ rules = [
+ {
+ field_name = content
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
new file mode 100644
index 000000000..5fcf05f58
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.redis;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisIT extends SparkContainer {
+ private static final String REDIS_IMAGE = "redis:latest";
+ private static final String REDIS_CONTAINER_HOST = "spark_e2e_redis";
+ private static final String REDIS_HOST = "localhost";
+ private static final int REDIS_PORT = 6379;
+ private GenericContainer<?> redisContainer;
+ private Jedis jedis;
+
+ @BeforeEach
+ public void startRedisContainer() {
+ redisContainer = new GenericContainer<>(REDIS_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(REDIS_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s",
REDIS_PORT, REDIS_PORT)));
+ Startables.deepStart(Stream.of(redisContainer)).join();
+ log.info("Redis container started");
+ given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initJedis);
+ this.generateTestData();
+ }
+
+ private void initJedis() {
+ jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+ }
+
+ private void generateTestData() {
+ jedis.set("key_test", "test");
+ jedis.set("key_test1", "test1");
+ jedis.set("key_test2", "test2");
+ jedis.set("key_test3", "test3");
+ jedis.set("key_test4", "test4");
+ }
+
+ @Test
+ public void testRedisSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/redis/redis_source.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @AfterEach
+ public void close() {
+ super.close();
+ jedis.close();
+ redisContainer.close();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
new file mode 100644
index 000000000..25ea6e009
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/redis/redis_source.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Redis {
+ host = spark_e2e_redis
+ port = 6379
+ keys = "key_test*"
+ data_type = key
+ format = text
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis
+}
+
+transform {
+
+}
+
+sink {
+
+ Console {
+
+ }
+
+ Assert {
+ rules = [
+ {
+ field_name = content
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index dad98d576..8f578303e 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -168,6 +168,7 @@ google-http-client-1.26.0.jar
google-http-client-jackson2-1.26.0.jar
google-oauth-client-1.26.0.jar
gson-2.2.4.jar
+gson-2.8.9.jar
guava-19.0.jar
guice-3.0.jar
guice-4.1.0.jar
@@ -348,7 +349,7 @@ jcodings-1.0.18.jar
jcodings-1.0.43.jar
jcommander-1.81.jar
jdbi-2.63.1.jar
-jedis-3.2.0.jar
+jedis-4.2.2.jar
jersey-client-1.19.jar
jersey-client-1.9.jar
jersey-client-2.22.2.jar
@@ -421,6 +422,7 @@ joni-2.1.27.jar
jopt-simple-5.0.2.jar
jpam-1.1.jar
jsch-0.1.54.jar
+json-20211205.jar
json-path-2.3.0.jar
json-smart-2.3.jar
jsoup-1.14.3.jar