This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1c10aacb3 [Feature][Redis Connector V2] Add Redis Connector Option
Rules & Improve Redis Connector doc (#3320)
1c10aacb3 is described below
commit 1c10aacb30722ab5c6a11d173c12ef3f15b5ebfa
Author: Eric <[email protected]>
AuthorDate: Sun Nov 13 13:26:30 2022 +0800
[Feature][Redis Connector V2] Add Redis Connector Option Rules & Improve
Redis Connector doc (#3320)
* tmp commit
* Add Redis Connector Option Rules and Improve redis doc
* Update
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisTableSinkFactory.java
Co-authored-by: Zongwen Li <[email protected]>
* Update
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisTableSourceFactory.java
Co-authored-by: Zongwen Li <[email protected]>
* update class name
* update spi interface to TableSinkFactory and TableSourceFactory
* improve redis options and docs
* schema must be config when format is config in redis source connector
* update to Factory.class
* [Feature][Connector-V2][Redis] Fix redis e2e
* [Feature][Connector-V2][Redis] Fix redis e2e
Co-authored-by: Zongwen Li <[email protected]>
Co-authored-by: tyrantlucifer <[email protected]>
---
docs/en/connector-v2/sink/Redis.md | 24 +++---
docs/en/connector-v2/source/Redis.md | 30 ++++----
.../seatunnel/redis/config/RedisConfig.java | 85 +++++++++++++++++++---
.../seatunnel/redis/config/RedisParameters.java | 39 +++++-----
.../connectors/seatunnel/redis/sink/RedisSink.java | 2 +-
.../seatunnel/redis/sink/RedisSinkFactory.java | 42 +++++++++++
.../seatunnel/redis/source/RedisSource.java | 26 ++++---
.../seatunnel/redis/source/RedisSourceFactory.java | 44 +++++++++++
8 files changed, 222 insertions(+), 70 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index 9c0fa3e95..e77f0f944 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -13,18 +13,18 @@ Used to write data to Redis.
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| key | string | yes | - |
-| data_type | string | yes | - |
-| user | string | no | - |
-| auth | string | no | - |
-| mode | string | no | - |
-| auth | list | no | - |
-| format | string | no | json |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------|--------|-----------------------|--------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| key | string | yes | - |
+| data_type | string | yes | - |
+| user | string | no | - |
+| auth | string | no | - |
+| mode | string | no | single |
+| nodes | list | yes when mode=cluster | - |
+| format | string | no | json |
+| common-options | | no | - |
### host [string]
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 589efc01b..930f915ea 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,20 +17,20 @@ Used to read data from Redis.
## Options
-| name | type | required | default value |
-|---------------------|--------|----------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| keys | string | yes | - |
-| data_type | string | yes | - |
-| user | string | no | - |
-| auth | string | no | - |
-| mode | string | no | - |
-| hash_key_parse_mode | string | no | all |
-| nodes | list | no | - |
-| schema | config | no | - |
-| format | string | no | json |
-| common-options | | no | - |
+| name | type | required | default value |
+|---------------------|--------|-----------------------|--------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| keys | string | yes | - |
+| data_type | string | yes | - |
+| user | string | no | - |
+| auth | string | no | - |
+| mode | string | no | single |
+| hash_key_parse_mode | string | no | all |
+| nodes | list | yes when mode=cluster | - |
+| schema | config | yes when format=json | - |
+| format | string | no | json |
+| common-options | | no | - |
### host [string]
@@ -211,7 +211,7 @@ connector will generate data as the following:
#### fields [config]
-the schema fields of upstream data
+the schema fields of redis data
### common options
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 11ee665a9..0c8b4107e 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -17,18 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
public class RedisConfig {
- public static final String HOST = "host";
- public static final String PORT = "port";
- public static final String AUTH = "auth";
- public static final String USER = "user";
- public static final String KEY_PATTERN = "keys";
- public static final String KEY = "key";
- public static final String DATA_TYPE = "data_type";
- public static final String FORMAT = "format";
- public static final String MODE = "mode";
- public static final String NODES = "nodes";
- public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";
public enum RedisMode {
SINGLE,
@@ -39,4 +31,75 @@ public class RedisConfig {
ALL,
KV;
}
+
+ public static final Option<String> HOST =
+ Options.key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis hostname or ip");
+
+ public static final Option<String> PORT =
+ Options.key("port")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis port");
+
+ public static final Option<String> AUTH =
+ Options.key("auth")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis authentication password, you need it when
you connect to an encrypted cluster");
+
+ public static final Option<String> USER =
+ Options.key("user")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis authentication user, you need it when you
connect to an encrypted cluster");
+
+ public static final Option<String> KEY_PATTERN =
+ Options.key("keys")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("keys pattern, redis source connector support
fuzzy key matching, user needs to ensure that the matched keys are the same
type");
+
+ public static final Option<String> KEY =
+ Options.key("key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The value of key you want to write to redis.");
+
+ public static final Option<String> DATA_TYPE =
+ Options.key("data_type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis data types, support key hash list set
zset.");
+
+ public static final Option<RedisConfig.Format> FORMAT =
+ Options.key("format")
+ .enumType(RedisConfig.Format.class)
+ .defaultValue(RedisConfig.Format.JSON)
+ .withDescription("the format of upstream data, now only support
json and text, default json.");
+
+ public static final Option<RedisConfig.RedisMode> MODE =
+ Options.key("mode")
+ .enumType(RedisConfig.RedisMode.class)
+ .defaultValue(RedisMode.SINGLE)
+ .withDescription("redis mode, support single or cluster, default
value is single");
+
+ public static final Option<String> NODES =
+ Options.key("nodes")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("redis nodes information, used in cluster mode,
must like as the following format: [host1:port1, host2:port2]");
+
+ public static final Option<RedisConfig.HashKeyParseMode>
HASH_KEY_PARSE_MODE =
+ Options.key("hash_key_parse_mode")
+ .enumType(RedisConfig.HashKeyParseMode.class)
+ .defaultValue(HashKeyParseMode.ALL)
+ .withDescription("hash key parse mode, support all or kv, default
value is all");
+
+ public enum Format {
+ JSON,
+ // TEXT will be supported later
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 75b74d206..f4190aadb 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -46,45 +46,46 @@ public class RedisParameters implements Serializable {
public void buildWithConfig(Config config) {
// set host
- this.host = config.getString(RedisConfig.HOST);
+ this.host = config.getString(RedisConfig.HOST.key());
// set port
- this.port = config.getInt(RedisConfig.PORT);
+ this.port = config.getInt(RedisConfig.PORT.key());
// set auth
- if (config.hasPath(RedisConfig.AUTH)) {
- this.auth = config.getString(RedisConfig.AUTH);
+ if (config.hasPath(RedisConfig.AUTH.key())) {
+ this.auth = config.getString(RedisConfig.AUTH.key());
}
// set user
- if (config.hasPath(RedisConfig.USER)) {
- this.user = config.getString(RedisConfig.USER);
+ if (config.hasPath(RedisConfig.USER.key())) {
+ this.user = config.getString(RedisConfig.USER.key());
}
// set mode
- if (config.hasPath(RedisConfig.MODE)) {
- this.mode =
RedisConfig.RedisMode.valueOf(config.getString(RedisConfig.MODE));
+ if (config.hasPath(RedisConfig.MODE.key())) {
+ this.mode = RedisConfig.RedisMode
+
.valueOf(config.getString(RedisConfig.MODE.key()).toUpperCase());
} else {
- this.mode = RedisConfig.RedisMode.SINGLE;
+ this.mode = RedisConfig.MODE.defaultValue();
}
// set hash key mode
- if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE)) {
+ if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE.key())) {
this.hashKeyParseMode = RedisConfig.HashKeyParseMode
-
.valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE).toUpperCase());
+
.valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE.key()).toUpperCase());
} else {
- this.hashKeyParseMode = RedisConfig.HashKeyParseMode.ALL;
+ this.hashKeyParseMode =
RedisConfig.HASH_KEY_PARSE_MODE.defaultValue();
}
// set redis nodes information
- if (config.hasPath(RedisConfig.NODES)) {
- this.redisNodes = config.getStringList(RedisConfig.NODES);
+ if (config.hasPath(RedisConfig.NODES.key())) {
+ this.redisNodes = config.getStringList(RedisConfig.NODES.key());
}
// set key
- if (config.hasPath(RedisConfig.KEY)) {
- this.keyField = config.getString(RedisConfig.KEY);
+ if (config.hasPath(RedisConfig.KEY.key())) {
+ this.keyField = config.getString(RedisConfig.KEY.key());
}
// set keysPattern
- if (config.hasPath(RedisConfig.KEY_PATTERN)) {
- this.keysPattern = config.getString(RedisConfig.KEY_PATTERN);
+ if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
+ this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
}
// set redis data type
try {
- String dataType = config.getString(RedisConfig.DATA_TYPE);
+ String dataType = config.getString(RedisConfig.DATA_TYPE.key());
this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
} catch (IllegalArgumentException e) {
throw new RuntimeException("Redis source connector only support
these data types [key, hash, list, set, zset]", e);
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index b6a1130fc..43d01414d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -51,7 +51,7 @@ public class RedisSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, RedisConfig.DATA_TYPE);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY.key(),
RedisConfig.DATA_TYPE.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
new file mode 100644
index 000000000..9cc3525dc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RedisSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Redis";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY,
RedisConfig.DATA_TYPE)
+ .optional(RedisConfig.AUTH, RedisConfig.USER,
RedisConfig.KEY_PATTERN, RedisConfig.FORMAT)
+ .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 2fe258d26..1219599ae 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -52,26 +52,28 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY_PATTERN,
RedisConfig.DATA_TYPE);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY_PATTERN.key(),
RedisConfig.DATA_TYPE.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
this.redisParameters.buildWithConfig(pluginConfig);
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ // TODO: use format SPI
+ // default use json format
+ if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
+ if (!pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Must config schema when format parameter been config");
+ }
Config schema =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
- this.seaTunnelRowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+
+ RedisConfig.Format format = RedisConfig.Format
+
.valueOf(pluginConfig.getString(RedisConfig.FORMAT.key()).toUpperCase());
+ if (RedisConfig.Format.JSON.equals(format)) {
+ this.seaTunnelRowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ this.deserializationSchema = new
JsonDeserializationSchema(false, false, seaTunnelRowType);
+ }
} else {
this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
- }
- // TODO: use format SPI
- // default use json format
- String format;
- if (pluginConfig.hasPath(RedisConfig.FORMAT)) {
- format = pluginConfig.getString(RedisConfig.FORMAT);
this.deserializationSchema = null;
- } else {
- format = "json";
- this.deserializationSchema = new JsonDeserializationSchema(false,
false, seaTunnelRowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
new file mode 100644
index 000000000..c26dce24c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RedisSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Redis";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY,
RedisConfig.DATA_TYPE)
+ .optional(RedisConfig.HASH_KEY_PARSE_MODE, RedisConfig.AUTH,
RedisConfig.USER, RedisConfig.KEY_PATTERN)
+ .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
+ .bundledRequired(RedisConfig.FORMAT, SeaTunnelSchema.SCHEMA)
+ .build();
+ }
+}