This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a0068efdb [Feature][Connector-V2][AmazonDynamoDB] Add Factory for
AmazonDynamoDB (#3348)
a0068efdb is described below
commit a0068efdbf7d9bcba887c8670896e1df6a8279d2
Author: Hisoka <[email protected]>
AuthorDate: Tue Nov 22 11:50:50 2022 +0800
[Feature][Connector-V2][AmazonDynamoDB] Add Factory for AmazonDynamoDB
(#3348)
* [Connector-V2] [Factory] Add Factory for AmazonDynamodb
* [Connector-V2] [Config] Add AmazonDynamoDB option rule
* [Connector-V2] [Config] Add AmazonDynamoDB option rule
* [Connector-V2] [AmazonDynamodb] change factoryIdentifier value
* [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB Source/Sink Factory.
* [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB
TableSource(Sink)Factory
* [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB
TableSource(Sink)Factory
* [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB
TableSource(Sink)Factory
* [Connector-V2] [AmazonDynamoDB] Add AmazonDynamoDB
TableSource(Sink)Factory
* [AmazonDynamoDB] [Option] Fix review
---
.../api/table/factory/TableSourceFactory.java | 1 -
.../config/AmazonDynamoDBConfig.java | 26 ++++++++----
.../config/AmazonDynamoDBSourceOptions.java | 32 +++++++--------
.../amazondynamodb/sink/AmazonDynamoDBSink.java | 2 +-
.../sink/AmazonDynamoDBSinkFactory.java | 47 ++++++++++++++++++++++
.../source/AmazonDynamoDBSource.java | 6 +--
.../source/AmazonDynamoDBSourceFactory.java | 45 +++++++++++++++++++++
.../seatunnel/common/config/CommonConfig.java | 22 ----------
8 files changed, 128 insertions(+), 53 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 23561f5cc..64eeef881 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -31,7 +31,6 @@ public interface TableSourceFactory extends Factory {
/**
* We will never use this method now. So gave a default implement and
return null.
* @param context TableFactoryContext
- * @return
*/
default <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index cc8376b89..7c325f180 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -17,14 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import java.io.Serializable;
public class AmazonDynamoDBConfig implements Serializable {
- public static final String URL = "url";
- public static final String REGION = "region";
- public static final String ACCESS_KEY_ID = "access_key_id";
- public static final String SECRET_ACCESS_KEY = "secret_access_key";
- public static final String TABLE = "table";
- public static final String BATCH_SIZE = "batch_size";
- public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms";
+ public static final Option<String> URL = Options.key("url").stringType()
+ .noDefaultValue().withDescription("url to read to Amazon DynamoDB");
+ public static final Option<String> REGION =
Options.key("region").stringType()
+ .noDefaultValue().withDescription("The region of Amazon DynamoDB");
+ public static final Option<String> ACCESS_KEY_ID =
Options.key("access_key_id").stringType()
+ .noDefaultValue().withDescription("The access id of Amazon DynamoDB");
+ public static final Option<String> SECRET_ACCESS_KEY =
Options.key("secret_access_key").stringType()
+ .noDefaultValue().withDescription("The access secret key of Amazon
DynamoDB");
+ public static final Option<String> TABLE =
Options.key("table").stringType()
+ .noDefaultValue().withDescription("The table of Amazon DynamoDB");
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size").intType().defaultValue(25)
+ .withDescription("The batch size of Amazon DynamoDB");
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms").intType()
+ .defaultValue(1000).withDescription("The batch interval of Amazon
DynamoDB");
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index b0e069894..e0f87c072 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
-import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -30,9 +30,6 @@ import java.io.Serializable;
@AllArgsConstructor
public class AmazonDynamoDBSourceOptions implements Serializable {
- private static final int DEFAULT_BATCH_SIZE = 25;
- private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
-
private String url;
private String region;
@@ -45,24 +42,23 @@ public class AmazonDynamoDBSourceOptions implements
Serializable {
private Config schema;
- public int batchSize = DEFAULT_BATCH_SIZE;
- public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+ public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
+ public int batchIntervalMs =
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
public AmazonDynamoDBSourceOptions(Config config) {
- this.url = config.getString(AmazonDynamoDBConfig.URL);
- this.region = config.getString(AmazonDynamoDBConfig.REGION);
- this.accessKeyId =
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID);
- this.secretAccessKey =
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY);
- this.table = config.getString(AmazonDynamoDBConfig.TABLE);
-
- if (config.hasPath(CommonConfig.SCHEMA)) {
- this.schema = config.getConfig(CommonConfig.SCHEMA);
+ this.url = config.getString(AmazonDynamoDBConfig.URL.key());
+ this.region = config.getString(AmazonDynamoDBConfig.REGION.key());
+ this.accessKeyId =
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID.key());
+ this.secretAccessKey =
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY.key());
+ this.table = config.getString(AmazonDynamoDBConfig.TABLE.key());
+ if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ this.schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
}
- if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE)) {
- this.batchSize = config.getInt(AmazonDynamoDBConfig.BATCH_SIZE);
+ if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
+ this.batchSize =
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
}
- if (config.hasPath(AmazonDynamoDBConfig.DEFAULT_BATCH_INTERVAL_MS)) {
- this.batchIntervalMs =
config.getInt(AmazonDynamoDBConfig.DEFAULT_BATCH_INTERVAL_MS);
+ if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
+ this.batchIntervalMs =
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index abe05337d..206465c70 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -58,7 +58,7 @@ public class AmazonDynamoDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL,
TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
URL.key(), TABLE.key(), REGION.key(), ACCESS_KEY_ID.key(),
SECRET_ACCESS_KEY.key());
if (!result.isSuccess()) {
throw new
AmazonDynamoDBConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message:
%s",
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
new file mode 100644
index 000000000..a4d21d4c0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AmazonDynamoDBSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "AmazonDynamoDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
+ .optional(BATCH_SIZE, BATCH_INTERVAL_MS).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index bea4730cd..90efd22a4 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -22,7 +22,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.Am
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -61,9 +61,7 @@ public class AmazonDynamoDBSource extends
AbstractSingleSplitSource<SeaTunnelRow
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- URL, TABLE, REGION,
- ACCESS_KEY_ID, SECRET_ACCESS_KEY, SCHEMA);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
URL.key(), TABLE.key(), REGION.key(), ACCESS_KEY_ID.key(),
SECRET_ACCESS_KEY.key(), SCHEMA.key());
if (!result.isSuccess()) {
throw new
AmazonDynamoDBConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message:
%s",
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
new file mode 100644
index 000000000..fa89c2dd9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AmazonDynamoDBSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "AmazonDynamoDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE,
SeaTunnelSchema.SCHEMA).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
deleted file mode 100644
index bb695ea88..000000000
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.common.config;
-
-public class CommonConfig {
- public static final String SCHEMA = "schema";
-}