This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cc8bb60c8 [Feature][Connector-V2][Http] Add option rules && Improve
Myhours sink connector (#3351)
cc8bb60c8 is described below
commit cc8bb60c83f8d780e718d72a54e9fb5604cbeaac
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Nov 9 22:04:50 2022 +0800
[Feature][Connector-V2][Http] Add option rules && Improve Myhours sink
connector (#3351)
* [Feature][Connector-V2][Http] Add option rules && Improve Myhours
Connector
* [Feature][Connector-V2][Http] Improve codes
* [Feature][Connector-V2][Http] Fix code style
---
.../seatunnel/http/config/HttpConfig.java | 58 ++++++++++++++++++----
.../seatunnel/http/config/HttpParameter.java | 45 ++++++++++-------
.../connectors/seatunnel/http/sink/HttpSink.java | 12 ++---
.../seatunnel/http/sink/HttpSinkFactory.java | 45 +++++++++++++++++
.../seatunnel/http/source/HttpSource.java | 10 ++--
.../seatunnel/http/source/HttpSourceFactory.java | 53 ++++++++++++++++++++
.../seatunnel/feishu/sink/FeishuSinkFactory.java} | 25 +++++-----
.../seatunnel/myhours/source/MyHoursSource.java | 21 ++++----
.../myhours/source/MyHoursSourceFactory.java | 45 +++++++++++++++++
.../myhours/source/config/MyHoursSourceConfig.java | 25 +++++++---
.../source/config/MyHoursSourceParameter.java | 30 ++++-------
.../seatunnel/wechat/sink/WeChatSinkFactory.java | 45 +++++++++++++++++
.../wechat/sink/config/WeChatSinkConfig.java | 28 +++++++----
13 files changed, 339 insertions(+), 103 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 8ee089aec..b7df0a25d 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -17,18 +17,54 @@
package org.apache.seatunnel.connectors.seatunnel.http.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Map;
+
public class HttpConfig {
- public static final String URL = "url";
- public static final String METHOD = "method";
public static final String METHOD_DEFAULT_VALUE = "GET";
- public static final String HEADERS = "headers";
- public static final String PARAMS = "params";
- public static final String BODY = "body";
- public static final String SCHEMA = "schema";
- public static final String FORMAT = "format";
public static final String DEFAULT_FORMAT = "json";
- public static final String POLL_INTERVAL_MILLS = "poll_interval_ms";
- public static final String RETRY = "retry";
- public static final String RETRY_BACKOFF_MULTIPLIER_MS =
"retry_backoff_multiplier_ms";
- public static final String RETRY_BACKOFF_MAX_MS = "retry_backoff_max_ms";
+ public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100;
+ public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000;
+ public static final Option<String> URL = Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Http request url");
+ public static final Option<String> METHOD = Options.key("method")
+ .stringType()
+ .defaultValue(METHOD_DEFAULT_VALUE)
+ .withDescription("Http request method");
+ public static final Option<Map<String, String>> HEADERS =
Options.key("headers")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Http request headers");
+ public static final Option<Map<String, String>> PARAMS =
Options.key("params")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Http request params");
+ public static final Option<String> BODY = Options.key("body")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Http request body");
+ public static final Option<String> FORMAT = Options.key("format")
+ .stringType()
+ .defaultValue(DEFAULT_FORMAT)
+ .withDescription("Http response format");
+ public static final Option<Integer> POLL_INTERVAL_MILLS =
Options.key("poll_interval_millis")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Request http api interval(millis) in stream
mode");
+ public static final Option<Integer> RETRY = Options.key("retry")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The max retry times if request http return to
IOException");
+ public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms")
+ .intType()
+ .defaultValue(DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS)
+ .withDescription("The retry-backoff times(millis) multiplier if
request http failed");
+ public static final Option<Integer> RETRY_BACKOFF_MAX_MS =
Options.key("retry_backoff_max_ms")
+ .intType()
+ .defaultValue(DEFAULT_RETRY_BACKOFF_MAX_MS)
+ .withDescription("The maximum retry-backoff times(millis) if
request http failed");
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index aaccb57ac..8db67572d 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -35,40 +35,47 @@ public class HttpParameter implements Serializable {
protected String body;
protected int pollIntervalMillis;
protected int retry;
- protected int retryBackoffMultiplierMillis = 100;
- protected int retryBackoffMaxMillis = 10000;
+ protected int retryBackoffMultiplierMillis =
HttpConfig.DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS;
+ protected int retryBackoffMaxMillis =
HttpConfig.DEFAULT_RETRY_BACKOFF_MAX_MS;
public void buildWithConfig(Config pluginConfig) {
// set url
- this.setUrl(pluginConfig.getString(HttpConfig.URL));
+ this.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
// set method
- if (pluginConfig.hasPath(HttpConfig.METHOD)) {
- this.setMethod(pluginConfig.getString(HttpConfig.METHOD));
+ if (pluginConfig.hasPath(HttpConfig.METHOD.key())) {
+ this.setMethod(pluginConfig.getString(HttpConfig.METHOD.key()));
} else {
this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE);
}
// set headers
- if (pluginConfig.hasPath(HttpConfig.HEADERS)) {
-
this.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ if (pluginConfig.hasPath(HttpConfig.HEADERS.key())) {
+
this.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
// set params
- if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
-
this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ if (pluginConfig.hasPath(HttpConfig.PARAMS.key())) {
+ this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS.key())
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry ->
String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
// set body
- if (pluginConfig.hasPath(HttpConfig.BODY)) {
- this.setBody(pluginConfig.getString(HttpConfig.BODY));
+ if (pluginConfig.hasPath(HttpConfig.BODY.key())) {
+ this.setBody(pluginConfig.getString(HttpConfig.BODY.key()));
}
- if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) {
-
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS));
+ if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS.key())) {
+
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS.key()));
}
- if (pluginConfig.hasPath(HttpConfig.RETRY)) {
- this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
- if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
-
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
+ this.setRetryParameters(pluginConfig);
+ }
+
+ public void setRetryParameters(Config pluginConfig) {
+ if (pluginConfig.hasPath(HttpConfig.RETRY.key())) {
+ this.setRetry(pluginConfig.getInt(HttpConfig.RETRY.key()));
+ if
(pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS.key()));
}
- if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
-
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
+ if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS.key())) {
+
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS.key()));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 10bbe46a6..af44593ab 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -53,16 +53,16 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HttpConfig.URL);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HttpConfig.URL.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
- httpParameter.setUrl(pluginConfig.getString(HttpConfig.URL));
- if (pluginConfig.hasPath(HttpConfig.HEADERS)) {
-
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ httpParameter.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
+ if (pluginConfig.hasPath(HttpConfig.HEADERS.key())) {
+
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
- if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
-
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ if (pluginConfig.hasPath(HttpConfig.PARAMS.key())) {
+
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.PARAMS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
new file mode 100644
index 000000000..0d961f711
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HttpSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Http";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(HttpConfig.URL)
+ .optional(HttpConfig.HEADERS)
+ .optional(HttpConfig.PARAMS)
+ .optional(HttpConfig.RETRY)
+ .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+ .optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 8fbb9bff2..69759f16c 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -60,7 +60,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HttpConfig.URL);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HttpConfig.URL.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
@@ -69,13 +69,13 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
protected void buildSchemaWithConfig(Config pluginConfig) {
- if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
- Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ Config schema =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
this.rowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
// default use json format
String format = HttpConfig.DEFAULT_FORMAT;
- if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
- format = pluginConfig.getString(HttpConfig.FORMAT);
+ if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) {
+ format = pluginConfig.getString(HttpConfig.FORMAT.key());
}
switch (format) {
case HttpConfig.DEFAULT_FORMAT:
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
new file mode 100644
index 000000000..7adb9ab61
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.configuration.util.Condition;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HttpSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Http";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(HttpConfig.URL)
+ .optional(HttpConfig.METHOD)
+ .optional(HttpConfig.HEADERS)
+ .optional(HttpConfig.PARAMS)
+ .conditional(Condition.of(HttpConfig.METHOD, "post"),
HttpConfig.BODY)
+ .conditional(Condition.of(HttpConfig.FORMAT, "json"),
SeaTunnelSchema.SCHEMA)
+ .optional(HttpConfig.FORMAT)
+ .optional(HttpConfig.POLL_INTERVAL_MILLS)
+ .optional(HttpConfig.RETRY)
+ .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+ .optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
copy to
seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
index 32466a761..2ee37f048 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
@@ -15,18 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
+package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
-public class MyHoursSourceConfig {
- public static final String URL = "url";
- public static final String POST = "POST";
- public static final String EMAIL = "email";
- public static final String PASSWORD = "password";
- public static final String GRANTTYPE = "grantType";
- public static final String CLIENTID = "clientId";
- public static final String API = "api";
- public static final String AUTHORIZATION = "Authorization";
- public static final String ACCESSTOKEN = "accessToken";
- public static final String ACCESSTOKEN_PREFIX = "Bearer";
- public static final String AUTHORIZATION_URL =
"https://api2.myhours.com/api/tokens/login";
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FeishuSinkFactory extends HttpSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Feishu";
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index 9d7e7a937..473afb0f6 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -41,7 +41,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Map;
-import java.util.Objects;
@Slf4j
@AutoService(SeaTunnelSource.class)
@@ -55,13 +54,13 @@ public class MyHoursSource extends HttpSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
MyHoursSourceConfig.URL, MyHoursSourceConfig.EMAIL,
MyHoursSourceConfig.PASSWORD);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
MyHoursSourceConfig.URL.key(),
+ MyHoursSourceConfig.EMAIL.key(),
MyHoursSourceConfig.PASSWORD.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
- //Login to get accessToken
- String accessToken = null;
- accessToken = getAccessToken(pluginConfig);
+ // Login to get accessToken
+ String accessToken = getAccessToken(pluginConfig);
this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken);
buildSchemaWithConfig(pluginConfig);
}
@@ -81,7 +80,7 @@ public class MyHoursSource extends HttpSource {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
Map<String, String> contentMap = JsonUtils.toMap(content);
- return contentMap.get(MyHoursSourceConfig.ACCESSTOKEN);
+ return contentMap.get(MyHoursSourceConfig.ACCESS_TOKEN);
}
}
throw new RuntimeException(String.format("login http client
execute exception, http response status code:[%d], content:[%s]",
@@ -90,12 +89,10 @@ public class MyHoursSource extends HttpSource {
} catch (Exception e) {
throw new RuntimeException("login http client execute exception");
} finally {
- if (Objects.nonNull(loginHttpClient)) {
- try {
- loginHttpClient.close();
- } catch (IOException e) {
- log.warn(e.getMessage(), e);
- }
+ try {
+ loginHttpClient.close();
+ } catch (IOException e) {
+ log.warn(e.getMessage(), e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
new file mode 100644
index 000000000..09df13ad3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.myhours.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import
org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MyHoursSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "MyHours";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(MyHoursSourceConfig.URL)
+ .required(MyHoursSourceConfig.EMAIL)
+ .required(MyHoursSourceConfig.PASSWORD)
+ .optional(MyHoursSourceConfig.RETRY)
+ .optional(MyHoursSourceConfig.RETRY_BACKOFF_MAX_MS)
+ .optional(MyHoursSourceConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
index 32466a761..7f96994bc 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
@@ -17,16 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
-public class MyHoursSourceConfig {
- public static final String URL = "url";
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+public class MyHoursSourceConfig extends HttpConfig {
public static final String POST = "POST";
- public static final String EMAIL = "email";
- public static final String PASSWORD = "password";
- public static final String GRANTTYPE = "grantType";
- public static final String CLIENTID = "clientId";
+ public static final String GRANT_TYPE = "grantType";
+ public static final String CLIENT_ID = "clientId";
public static final String API = "api";
public static final String AUTHORIZATION = "Authorization";
- public static final String ACCESSTOKEN = "accessToken";
- public static final String ACCESSTOKEN_PREFIX = "Bearer";
+ public static final String ACCESS_TOKEN = "accessToken";
+ public static final String ACCESS_TOKEN_PREFIX = "Bearer";
public static final String AUTHORIZATION_URL =
"https://api2.myhours.com/api/tokens/login";
+ public static final Option<String> EMAIL = Options.key("email")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("My hours login email address");
+ public static final Option<String> PASSWORD = Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("My hours login password");
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
index 40beedb27..5bd11d2d1 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -29,9 +28,9 @@ import java.util.Map;
public class MyHoursSourceParameter extends HttpParameter {
public void buildWithConfig(Config pluginConfig, String accessToken) {
super.buildWithConfig(pluginConfig);
- //put authorization in headers
+ // put authorization in headers
this.headers = this.getHeaders() == null ? new HashMap<>() :
this.getHeaders();
- this.headers.put(MyHoursSourceConfig.AUTHORIZATION,
MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken);
+ this.headers.put(MyHoursSourceConfig.AUTHORIZATION,
MyHoursSourceConfig.ACCESS_TOKEN_PREFIX + " " + accessToken);
this.setHeaders(this.headers);
}
@@ -41,24 +40,15 @@ public class MyHoursSourceParameter extends HttpParameter {
// set method
this.setMethod(MyHoursSourceConfig.POST);
// set body
- Map bodyParams = new HashMap();
- String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL);
- String password = pluginConfig.getString(MyHoursSourceConfig.PASSWORD);
- bodyParams.put(MyHoursSourceConfig.GRANTTYPE,
MyHoursSourceConfig.PASSWORD);
- bodyParams.put(MyHoursSourceConfig.EMAIL, email);
- bodyParams.put(MyHoursSourceConfig.PASSWORD, password);
- bodyParams.put(MyHoursSourceConfig.CLIENTID, MyHoursSourceConfig.API);
+ Map<String, String> bodyParams = new HashMap<>();
+ String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL.key());
+ String password =
pluginConfig.getString(MyHoursSourceConfig.PASSWORD.key());
+ bodyParams.put(MyHoursSourceConfig.GRANT_TYPE,
MyHoursSourceConfig.PASSWORD.key());
+ bodyParams.put(MyHoursSourceConfig.EMAIL.key(), email);
+ bodyParams.put(MyHoursSourceConfig.PASSWORD.key(), password);
+ bodyParams.put(MyHoursSourceConfig.CLIENT_ID, MyHoursSourceConfig.API);
String body = JsonUtils.toJsonString(bodyParams);
this.setBody(body);
- // set retry
- if (pluginConfig.hasPath(HttpConfig.RETRY)) {
- this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
- if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
-
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
- }
- if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
-
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
- }
- }
+ this.setRetryParameters(pluginConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
new file mode 100644
index 000000000..b4b4d9e14
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class WeChatSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "WeChat";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(WeChatSinkConfig.URL)
+ .optional(WeChatSinkConfig.MENTIONED_LIST)
+ .optional(WeChatSinkConfig.MENTIONED_MOBILE_LIST)
+ .optional(WeChatSinkConfig.RETRY)
+ .optional(WeChatSinkConfig.RETRY_BACKOFF_MAX_MS)
+ .optional(WeChatSinkConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
index be2baab60..8936fedbf 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
@@ -17,29 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.wechat.sink.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import lombok.Data;
+import lombok.Getter;
import lombok.NonNull;
import java.util.List;
-@Data
-public class WeChatSinkConfig {
+@Getter
+public class WeChatSinkConfig extends HttpConfig {
public static final String WECHAT_SEND_MSG_SUPPORT_TYPE = "text";
public static final String WECHAT_SEND_MSG_TYPE_KEY = "msgtype";
public static final String WECHAT_SEND_MSG_CONTENT_KEY = "content";
- public static final String MENTIONED_LIST = "mentioned_list";
- public static final String MENTIONED_MOBILE_LIST = "mentioned_mobile_list";
+ public static final Option<List<String>> MENTIONED_LIST =
Options.key("mentioned_list")
+ .listType()
+ .noDefaultValue()
+ .withDescription("A list of userids to remind the specified
members in the group (@ a member), @ all means to remind everyone");
+ public static final Option<List<String>> MENTIONED_MOBILE_LIST =
Options.key("mentioned_mobile_list")
+ .listType()
+ .noDefaultValue()
+ .withDescription("Mobile phone number list, remind the group
member corresponding to the mobile phone number (@ a member), @ all means
remind everyone");
private List<String> mentionedList;
private List<String> mentionedMobileList;
public WeChatSinkConfig(@NonNull Config pluginConfig){
- if (pluginConfig.hasPath(MENTIONED_LIST)) {
- this.mentionedList = pluginConfig.getStringList(MENTIONED_LIST);
+ if (pluginConfig.hasPath(MENTIONED_LIST.key())) {
+ this.mentionedList =
pluginConfig.getStringList(MENTIONED_LIST.key());
}
- if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST)) {
- this.mentionedMobileList =
pluginConfig.getStringList(MENTIONED_MOBILE_LIST);
+ if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST.key())) {
+ this.mentionedMobileList =
pluginConfig.getStringList(MENTIONED_MOBILE_LIST.key());
}
}
}