This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 850f48381 [Feature][Sentry Sink V2] Add Sentry Sink Option Rules
(#3318)
850f48381 is described below
commit 850f483816c0c8255bdd834dea5864a8a1040260
Author: Eric <[email protected]>
AuthorDate: Tue Nov 15 14:42:31 2022 +0800
[Feature][Sentry Sink V2] Add Sentry Sink Option Rules (#3318)
---
.../seatunnel/sentry/sink/SentryConfig.java | 19 ++++++++------
.../seatunnel/sentry/sink/SentrySink.java | 2 +-
.../{SentryConfig.java => SentrySinkFactory.java} | 28 ++++++++++++--------
.../seatunnel/sentry/sink/SentrySinkWriter.java | 30 +++++++++++-----------
4 files changed, 45 insertions(+), 34 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
index 421495b96..82eec070e 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
@@ -17,16 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
public class SentryConfig {
public static final String SENTRY = "sentry";
- public static final String DSN = "dsn";
- public static final String ENV = "env";
- public static final String RELEASE = "release";
- public static final String CACHE_DIRPATH = "cacheDirPath";
- public static final String ENABLE_EXTERNAL_CONFIGURATION =
"enableExternalConfiguration";
- public static final String MAX_CACHEITEMS = "maxCacheItems";
- public static final String FLUSH_TIMEOUTMILLIS = "flushTimeoutMillis";
- public static final String MAX_QUEUESIZE = "maxQueueSize";
+ public static final Option<String> DSN =
Options.key("dsn").stringType().noDefaultValue().withDescription("sentry dsn");
+ public static final Option<String> ENV =
Options.key("env").stringType().noDefaultValue().withDescription("env");
+ public static final Option<String> RELEASE =
Options.key("release").stringType().noDefaultValue().withDescription("release");
+ public static final Option<String> CACHE_DIRPATH =
Options.key("cacheDirPath").stringType().noDefaultValue().withDescription("sentry
cache dir path");
+ public static final Option<String> ENABLE_EXTERNAL_CONFIGURATION =
Options.key("enableExternalConfiguration").stringType().noDefaultValue().withDescription("enable
external configuration");
+ public static final Option<String> MAX_CACHEITEMS =
Options.key("maxCacheItems").stringType().noDefaultValue().withDescription("max
cache items");
+ public static final Option<String> FLUSH_TIMEOUTMILLIS =
Options.key("flushTimeoutMillis").stringType().noDefaultValue().withDescription("flush
timeout millis");
+ public static final Option<String> MAX_QUEUESIZE =
Options.key("maxQueueSize").stringType().noDefaultValue().withDescription("flush
queue size");
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index e4f1ada70..8c8eb5422 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -48,7 +48,7 @@ public class SentrySink extends
AbstractSimpleSink<SeaTunnelRow, SentrySinkState
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- if (!pluginConfig.hasPath(SentryConfig.DSN)) {
+ if (!pluginConfig.hasPath(SentryConfig.DSN.key())) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
String.format("Config must include column : %s",
SentryConfig.DSN));
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
copy to
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
index 421495b96..7fe6275f9 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
@@ -17,16 +17,24 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
-public class SentryConfig {
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
- public static final String SENTRY = "sentry";
- public static final String DSN = "dsn";
- public static final String ENV = "env";
- public static final String RELEASE = "release";
- public static final String CACHE_DIRPATH = "cacheDirPath";
- public static final String ENABLE_EXTERNAL_CONFIGURATION =
"enableExternalConfiguration";
- public static final String MAX_CACHEITEMS = "maxCacheItems";
- public static final String FLUSH_TIMEOUTMILLIS = "flushTimeoutMillis";
- public static final String MAX_QUEUESIZE = "maxQueueSize";
+import com.google.auto.service.AutoService;
+@AutoService(Factory.class)
+public class SentrySinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return SentryConfig.SENTRY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(SentryConfig.DSN)
+ .optional(SentryConfig.ENV, SentryConfig.CACHE_DIRPATH,
SentryConfig.ENABLE_EXTERNAL_CONFIGURATION,
+ SentryConfig.FLUSH_TIMEOUTMILLIS, SentryConfig.MAX_CACHEITEMS,
SentryConfig.MAX_QUEUESIZE,
+ SentryConfig.RELEASE).build();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
index 4f94055bd..9580dee2b 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
@@ -39,27 +39,27 @@ public class SentrySinkWriter extends
AbstractSinkWriter<SeaTunnelRow, SentrySin
SinkWriter.Context context,
Config pluginConfig) {
SentryOptions options = new SentryOptions();
- options.setDsn(pluginConfig.getString(SentryConfig.DSN));
- if (pluginConfig.hasPath(SentryConfig.ENV)){
- options.setEnvironment(pluginConfig.getString(SentryConfig.ENV));
+ options.setDsn(pluginConfig.getString(SentryConfig.DSN.key()));
+ if (pluginConfig.hasPath(SentryConfig.ENV.key())){
+
options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key()));
}
- if (pluginConfig.hasPath(SentryConfig.RELEASE)){
- options.setRelease(pluginConfig.getString(SentryConfig.RELEASE));
+ if (pluginConfig.hasPath(SentryConfig.RELEASE.key())){
+
options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key()));
}
- if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH)){
-
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH));
+ if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())){
+
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key()));
}
- if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS)){
-
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS));
+ if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())){
+
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key()));
}
- if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE)){
-
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE));
+ if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())){
+
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key()));
}
- if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS)){
-
options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS));
+ if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())){
+
options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key()));
}
- if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION)){
-
options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION));
+ if
(pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())){
+
options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key()));
}
Sentry.init(options);
}