This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a0068efdb [Feature][Connector-V2][AmazonDynamoDB] Add Factory for 
AmazonDynamoDB (#3348)
a0068efdb is described below

commit a0068efdbf7d9bcba887c8670896e1df6a8279d2
Author: Hisoka <[email protected]>
AuthorDate: Tue Nov 22 11:50:50 2022 +0800

    [Feature][Connector-V2][AmazonDynamoDB] Add Factory for AmazonDynamoDB 
(#3348)
    
    * [Connector-V2] [Factory] Add Factory for AmazonDynamodb
    
    * [Connector-V2] [Config] Add AmazonDynamoDB option rule
    
    * [Connector-V2] [Config] Add AmazonDynamoDB option rule
    
    * [Connector-V2] [AmazonDynamodb] change factoryIdentifier value
    
    * [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB Source/Sink Factory.
    
    * [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB 
TableSource(Sink)Factory
    
    * [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB 
TableSource(Sink)Factory
    
    * [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB 
TableSource(Sink)Factory
    
    * [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB 
TableSource(Sink)Factory
    
    * [AmazonDynamoDB] [Option] Fix review
---
 .../api/table/factory/TableSourceFactory.java      |  1 -
 .../config/AmazonDynamoDBConfig.java               | 26 ++++++++----
 .../config/AmazonDynamoDBSourceOptions.java        | 32 +++++++--------
 .../amazondynamodb/sink/AmazonDynamoDBSink.java    |  2 +-
 .../sink/AmazonDynamoDBSinkFactory.java            | 47 ++++++++++++++++++++++
 .../source/AmazonDynamoDBSource.java               |  6 +--
 .../source/AmazonDynamoDBSourceFactory.java        | 45 +++++++++++++++++++++
 .../seatunnel/common/config/CommonConfig.java      | 22 ----------
 8 files changed, 128 insertions(+), 53 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 23561f5cc..64eeef881 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -31,7 +31,6 @@ public interface TableSourceFactory extends Factory {
     /**
      * We will never use this method now. So gave a default implement and 
return null.
      * @param context TableFactoryContext
-     * @return
      */
     default <T, SplitT extends SourceSplit, StateT extends Serializable> 
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
         throw new UnsupportedOperationException("unsupported now");
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index cc8376b89..7c325f180 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -17,14 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import java.io.Serializable;
 
 public class AmazonDynamoDBConfig implements Serializable {
-    public static final String URL = "url";
-    public static final String REGION = "region";
-    public static final String ACCESS_KEY_ID = "access_key_id";
-    public static final String SECRET_ACCESS_KEY = "secret_access_key";
-    public static final String TABLE = "table";
-    public static final String BATCH_SIZE = "batch_size";
-    public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms";
+    public static final Option<String> URL = Options.key("url").stringType()
+        .noDefaultValue().withDescription("url to read to Amazon DynamoDB");
+    public static final Option<String> REGION = 
Options.key("region").stringType()
+        .noDefaultValue().withDescription("The region of Amazon DynamoDB");
+    public static final Option<String> ACCESS_KEY_ID = 
Options.key("access_key_id").stringType()
+        .noDefaultValue().withDescription("The access id of Amazon DynamoDB");
+    public static final Option<String> SECRET_ACCESS_KEY = 
Options.key("secret_access_key").stringType()
+        .noDefaultValue().withDescription("The access secret key of Amazon 
DynamoDB");
+    public static final Option<String> TABLE = 
Options.key("table").stringType()
+        .noDefaultValue().withDescription("The table of Amazon DynamoDB");
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> BATCH_SIZE = 
Options.key("batch_size").intType().defaultValue(25)
+        .withDescription("The batch size of Amazon DynamoDB");
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms").intType()
+        .defaultValue(1000).withDescription("The batch interval of Amazon 
DynamoDB");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index b0e069894..e0f87c072 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
 
-import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -30,9 +30,6 @@ import java.io.Serializable;
 @AllArgsConstructor
 public class AmazonDynamoDBSourceOptions implements Serializable {
 
-    private static final int DEFAULT_BATCH_SIZE = 25;
-    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
-
     private String url;
 
     private String region;
@@ -45,24 +42,23 @@ public class AmazonDynamoDBSourceOptions implements 
Serializable {
 
     private Config schema;
 
-    public int batchSize = DEFAULT_BATCH_SIZE;
-    public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+    public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
+    public int batchIntervalMs = 
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
 
     public AmazonDynamoDBSourceOptions(Config config) {
-        this.url = config.getString(AmazonDynamoDBConfig.URL);
-        this.region = config.getString(AmazonDynamoDBConfig.REGION);
-        this.accessKeyId = 
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID);
-        this.secretAccessKey = 
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY);
-        this.table = config.getString(AmazonDynamoDBConfig.TABLE);
-
-        if (config.hasPath(CommonConfig.SCHEMA)) {
-            this.schema = config.getConfig(CommonConfig.SCHEMA);
+        this.url = config.getString(AmazonDynamoDBConfig.URL.key());
+        this.region = config.getString(AmazonDynamoDBConfig.REGION.key());
+        this.accessKeyId = 
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID.key());
+        this.secretAccessKey = 
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY.key());
+        this.table = config.getString(AmazonDynamoDBConfig.TABLE.key());
+        if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            this.schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
         }
-        if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE)) {
-            this.batchSize = config.getInt(AmazonDynamoDBConfig.BATCH_SIZE);
+        if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
+            this.batchSize = 
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
         }
-        if (config.hasPath(AmazonDynamoDBConfig.DEFAULT_BATCH_INTERVAL_MS)) {
-            this.batchIntervalMs = 
config.getInt(AmazonDynamoDBConfig.DEFAULT_BATCH_INTERVAL_MS);
+        if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
+            this.batchIntervalMs = 
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index abe05337d..206465c70 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -58,7 +58,7 @@ public class AmazonDynamoDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, 
TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
URL.key(), TABLE.key(), REGION.key(), ACCESS_KEY_ID.key(), 
SECRET_ACCESS_KEY.key());
         if (!result.isSuccess()) {
             throw new 
AmazonDynamoDBConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: 
%s",
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
new file mode 100644
index 000000000..a4d21d4c0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AmazonDynamoDBSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "AmazonDynamoDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
+            .optional(BATCH_SIZE, BATCH_INTERVAL_MS).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index bea4730cd..90efd22a4 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -22,7 +22,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.Am
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -61,9 +61,7 @@ public class AmazonDynamoDBSource extends 
AbstractSingleSplitSource<SeaTunnelRow
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                URL, TABLE, REGION,
-                ACCESS_KEY_ID, SECRET_ACCESS_KEY, SCHEMA);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
URL.key(), TABLE.key(), REGION.key(), ACCESS_KEY_ID.key(), 
SECRET_ACCESS_KEY.key(), SCHEMA.key());
         if (!result.isSuccess()) {
             throw new 
AmazonDynamoDBConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: 
%s",
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
new file mode 100644
index 000000000..fa89c2dd9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AmazonDynamoDBSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "AmazonDynamoDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE, 
SeaTunnelSchema.SCHEMA).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
deleted file mode 100644
index bb695ea88..000000000
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.common.config;
-
-public class CommonConfig {
-    public static final String SCHEMA = "schema";
-}

Reply via email to