This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1c10aacb3 [Feature][Redis Connector V2] Add Redis Connector Option 
Rules & Improve Redis Connector doc (#3320)
1c10aacb3 is described below

commit 1c10aacb30722ab5c6a11d173c12ef3f15b5ebfa
Author: Eric <[email protected]>
AuthorDate: Sun Nov 13 13:26:30 2022 +0800

    [Feature][Redis Connector V2] Add Redis Connector Option Rules & Improve 
Redis Connector doc (#3320)
    
    * tmp commit
    
    * Add Redis Connector Option Rules and Improve redis doc
    
    * Update 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisTableSinkFactory.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    * Update 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisTableSourceFactory.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    * update class name
    
    * update spi interface to TableSinkFactory and TableSourceFactory
    
    * improve redis options and docs
    
    * schema must be config when format is config in redis source connector
    
    * update to Factory.class
    
    * [Feature][Connector-V2][Redis] Fix redis e2e
    
    * [Feature][Connector-V2][Redis] Fix redis e2e
    
    Co-authored-by: Zongwen Li <[email protected]>
    Co-authored-by: tyrantlucifer <[email protected]>
---
 docs/en/connector-v2/sink/Redis.md                 | 24 +++---
 docs/en/connector-v2/source/Redis.md               | 30 ++++----
 .../seatunnel/redis/config/RedisConfig.java        | 85 +++++++++++++++++++---
 .../seatunnel/redis/config/RedisParameters.java    | 39 +++++-----
 .../connectors/seatunnel/redis/sink/RedisSink.java |  2 +-
 .../seatunnel/redis/sink/RedisSinkFactory.java     | 42 +++++++++++
 .../seatunnel/redis/source/RedisSource.java        | 26 ++++---
 .../seatunnel/redis/source/RedisSourceFactory.java | 44 +++++++++++
 8 files changed, 222 insertions(+), 70 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index 9c0fa3e95..e77f0f944 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -13,18 +13,18 @@ Used to write data to Redis.
 
 ##  Options
 
-| name           | type   | required | default value |
-|----------------|--------|----------|---------------|
-| host           | string | yes      | -             |
-| port           | int    | yes      | -             |
-| key            | string | yes      | -             |
-| data_type      | string | yes      | -             |
-| user           | string | no       | -             |
-| auth           | string | no       | -             |
-| mode           | string | no       | -             |
-| auth           | list   | no       | -             |
-| format         | string | no       | json          |
-| common-options |        | no       | -             |
+| name           | type   | required              | default value |
+|----------------|--------|-----------------------|--------------|
+| host           | string | yes                   | -            |
+| port           | int    | yes                   | -            |
+| key            | string | yes                   | -            |
+| data_type      | string | yes                   | -            |
+| user           | string | no                    | -            |
+| auth           | string | no                    | -            |
+| mode           | string | no                    | single       |
+| nodes          | list   | yes when mode=cluster | -            |
+| format         | string | no                    | json         |
+| common-options |        | no                    | -            |
 
 ### host [string]
 
diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index 589efc01b..930f915ea 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,20 +17,20 @@ Used to read data from Redis.
 
 ##  Options
 
-| name                | type   | required | default value |
-|---------------------|--------|----------|---------------|
-| host                | string | yes      | -             |
-| port                | int    | yes      | -             |
-| keys                | string | yes      | -             |
-| data_type           | string | yes      | -             |
-| user                | string | no       | -             |
-| auth                | string | no       | -             |
-| mode                | string | no       | -             |
-| hash_key_parse_mode | string | no       | all           |
-| nodes               | list   | no       | -             |
-| schema              | config | no       | -             |
-| format              | string | no       | json          |
-| common-options      |        | no       | -             |
+| name                | type   | required              | default value |
+|---------------------|--------|-----------------------|--------------|
+| host                | string | yes                   | -            |
+| port                | int    | yes                   | -            |
+| keys                | string | yes                   | -            |
+| data_type           | string | yes                   | -            |
+| user                | string | no                    | -            |
+| auth                | string | no                    | -            |
+| mode                | string | no                    | single       |
+| hash_key_parse_mode | string | no                    | all          |
+| nodes               | list   | yes when mode=cluster | -            |
+| schema              | config | yes when format=json  | -            |
+| format              | string | no                    | json         |
+| common-options      |        | no                    | -            |
 
 ### host [string]
 
@@ -211,7 +211,7 @@ connector will generate data as the following:
 
 #### fields [config]
 
-the schema fields of upstream data
+the schema fields of redis data
 
 ### common options 
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 11ee665a9..0c8b4107e 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -17,18 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 public class RedisConfig {
-    public static final String HOST = "host";
-    public static final String PORT = "port";
-    public static final String AUTH = "auth";
-    public static final String USER = "user";
-    public static final String KEY_PATTERN = "keys";
-    public static final String KEY = "key";
-    public static final String DATA_TYPE = "data_type";
-    public static final String FORMAT = "format";
-    public static final String MODE = "mode";
-    public static final String NODES = "nodes";
-    public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";
 
     public enum RedisMode {
         SINGLE,
@@ -39,4 +31,75 @@ public class RedisConfig {
         ALL,
         KV;
     }
+
+    public static final Option<String> HOST =
+        Options.key("host")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis hostname or ip");
+
+    public static final Option<String> PORT =
+        Options.key("port")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis port");
+
+    public static final Option<String> AUTH =
+        Options.key("auth")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis authentication password, you need it when 
you connect to an encrypted cluster");
+
+    public static final Option<String> USER =
+        Options.key("user")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis authentication user, you need it when you 
connect to an encrypted cluster");
+
+    public static final Option<String> KEY_PATTERN =
+        Options.key("keys")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("keys pattern, redis source connector support 
fuzzy key matching, user needs to ensure that the matched keys are the same 
type");
+
+    public static final Option<String> KEY =
+        Options.key("key")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The value of key you want to write to redis.");
+
+    public static final Option<String> DATA_TYPE =
+        Options.key("data_type")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis data types, support key hash list set 
zset.");
+
+    public static final Option<RedisConfig.Format> FORMAT =
+        Options.key("format")
+            .enumType(RedisConfig.Format.class)
+            .defaultValue(RedisConfig.Format.JSON)
+            .withDescription("the format of upstream data, now only support 
json and text, default json.");
+
+    public static final Option<RedisConfig.RedisMode> MODE =
+        Options.key("mode")
+            .enumType(RedisConfig.RedisMode.class)
+            .defaultValue(RedisMode.SINGLE)
+            .withDescription("redis mode, support single or cluster, default 
value is single");
+
+    public static final Option<String> NODES =
+        Options.key("nodes")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("redis nodes information, used in cluster mode, 
must like as the following format: [host1:port1, host2:port2]");
+
+    public static final Option<RedisConfig.HashKeyParseMode> 
HASH_KEY_PARSE_MODE =
+        Options.key("hash_key_parse_mode")
+            .enumType(RedisConfig.HashKeyParseMode.class)
+            .defaultValue(HashKeyParseMode.ALL)
+            .withDescription("hash key parse mode, support all or kv, default 
value is all");
+
+    public enum Format {
+        JSON,
+        // TEXT will be supported later
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 75b74d206..f4190aadb 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -46,45 +46,46 @@ public class RedisParameters implements Serializable {
 
     public void buildWithConfig(Config config) {
         // set host
-        this.host = config.getString(RedisConfig.HOST);
+        this.host = config.getString(RedisConfig.HOST.key());
         // set port
-        this.port = config.getInt(RedisConfig.PORT);
+        this.port = config.getInt(RedisConfig.PORT.key());
         // set auth
-        if (config.hasPath(RedisConfig.AUTH)) {
-            this.auth = config.getString(RedisConfig.AUTH);
+        if (config.hasPath(RedisConfig.AUTH.key())) {
+            this.auth = config.getString(RedisConfig.AUTH.key());
         }
         // set user
-        if (config.hasPath(RedisConfig.USER)) {
-            this.user = config.getString(RedisConfig.USER);
+        if (config.hasPath(RedisConfig.USER.key())) {
+            this.user = config.getString(RedisConfig.USER.key());
         }
         // set mode
-        if (config.hasPath(RedisConfig.MODE)) {
-            this.mode = 
RedisConfig.RedisMode.valueOf(config.getString(RedisConfig.MODE));
+        if (config.hasPath(RedisConfig.MODE.key())) {
+            this.mode = RedisConfig.RedisMode
+                    
.valueOf(config.getString(RedisConfig.MODE.key()).toUpperCase());
         } else {
-            this.mode = RedisConfig.RedisMode.SINGLE;
+            this.mode = RedisConfig.MODE.defaultValue();
         }
         // set hash key mode
-        if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE)) {
+        if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE.key())) {
             this.hashKeyParseMode = RedisConfig.HashKeyParseMode
-                    
.valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE).toUpperCase());
+                    
.valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE.key()).toUpperCase());
         } else {
-            this.hashKeyParseMode = RedisConfig.HashKeyParseMode.ALL;
+            this.hashKeyParseMode = 
RedisConfig.HASH_KEY_PARSE_MODE.defaultValue();
         }
         // set redis nodes information
-        if (config.hasPath(RedisConfig.NODES)) {
-            this.redisNodes = config.getStringList(RedisConfig.NODES);
+        if (config.hasPath(RedisConfig.NODES.key())) {
+            this.redisNodes = config.getStringList(RedisConfig.NODES.key());
         }
         // set key
-        if (config.hasPath(RedisConfig.KEY)) {
-            this.keyField = config.getString(RedisConfig.KEY);
+        if (config.hasPath(RedisConfig.KEY.key())) {
+            this.keyField = config.getString(RedisConfig.KEY.key());
         }
         // set keysPattern
-        if (config.hasPath(RedisConfig.KEY_PATTERN)) {
-            this.keysPattern = config.getString(RedisConfig.KEY_PATTERN);
+        if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
+            this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
         }
         // set redis data type
         try {
-            String dataType = config.getString(RedisConfig.DATA_TYPE);
+            String dataType = config.getString(RedisConfig.DATA_TYPE.key());
             this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
         } catch (IllegalArgumentException e) {
             throw new RuntimeException("Redis source connector only support 
these data types [key, hash, list, set, zset]", e);
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index b6a1130fc..43d01414d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -51,7 +51,7 @@ public class RedisSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.pluginConfig = pluginConfig;
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, RedisConfig.DATA_TYPE);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY.key(), 
RedisConfig.DATA_TYPE.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
new file mode 100644
index 000000000..9cc3525dc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RedisSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Redis";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, 
RedisConfig.DATA_TYPE)
+            .optional(RedisConfig.AUTH, RedisConfig.USER, 
RedisConfig.KEY_PATTERN, RedisConfig.FORMAT)
+            .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 2fe258d26..1219599ae 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -52,26 +52,28 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY_PATTERN, 
RedisConfig.DATA_TYPE);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY_PATTERN.key(), 
RedisConfig.DATA_TYPE.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
         this.redisParameters.buildWithConfig(pluginConfig);
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+        // TODO: use format SPI
+        // default use json format
+        if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
+            if (!pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Must config schema when format parameter been config");
+            }
             Config schema = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
-            this.seaTunnelRowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+
+            RedisConfig.Format format = RedisConfig.Format
+                    
.valueOf(pluginConfig.getString(RedisConfig.FORMAT.key()).toUpperCase());
+            if (RedisConfig.Format.JSON.equals(format)) {
+                this.seaTunnelRowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+                this.deserializationSchema = new 
JsonDeserializationSchema(false, false, seaTunnelRowType);
+            }
         } else {
             this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
-        }
-        // TODO: use format SPI
-        // default use json format
-        String format;
-        if (pluginConfig.hasPath(RedisConfig.FORMAT)) {
-            format = pluginConfig.getString(RedisConfig.FORMAT);
             this.deserializationSchema = null;
-        } else {
-            format = "json";
-            this.deserializationSchema = new JsonDeserializationSchema(false, 
false, seaTunnelRowType);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
new file mode 100644
index 000000000..c26dce24c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RedisSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Redis";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, 
RedisConfig.DATA_TYPE)
+            .optional(RedisConfig.HASH_KEY_PARSE_MODE, RedisConfig.AUTH, 
RedisConfig.USER, RedisConfig.KEY_PATTERN)
+            .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
+            .bundledRequired(RedisConfig.FORMAT, SeaTunnelSchema.SCHEMA)
+            .build();
+    }
+}

Reply via email to