This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a2f3fa915 [improve] sentry options (#9261)
4a2f3fa915 is described below
commit 4a2f3fa91590db9c1f0b3ca081d466cb54b24dae
Author: Jarvis <[email protected]>
AuthorDate: Mon May 12 09:54:17 2025 +0800
[improve] sentry options (#9261)
Co-authored-by: hailin0 <[email protected]>
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../{SentryConfig.java => SentrySinkOptions.java} | 18 ++++-----
.../seatunnel/sentry/sink/SentrySink.java | 45 +++++-----------------
.../seatunnel/sentry/sink/SentrySinkFactory.java | 27 ++++++++-----
.../seatunnel/sentry/sink/SentrySinkWriter.java | 40 +++++++++----------
5 files changed, 54 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index b181393b07..638218c4b9 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("PulsarSinkOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
- whiteList.add("SentrySinkOptions");
whiteList.add("RocketMqSinkOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
similarity index 84%
rename from
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
rename to
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
index 886fd49bb7..9cfeb97b59 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.sentry.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class SentryConfig {
+public class SentrySinkOptions {
public static final String SENTRY = "Sentry";
@@ -35,24 +35,24 @@ public class SentryConfig {
.stringType()
.noDefaultValue()
.withDescription("sentry cache dir path");
- public static final Option<String> ENABLE_EXTERNAL_CONFIGURATION =
+ public static final Option<Boolean> ENABLE_EXTERNAL_CONFIGURATION =
Options.key("enableExternalConfiguration")
- .stringType()
+ .booleanType()
.noDefaultValue()
.withDescription("enable external configuration");
- public static final Option<String> MAX_CACHEITEMS =
+ public static final Option<Integer> MAX_CACHEITEMS =
Options.key("maxCacheItems")
- .stringType()
+ .intType()
.noDefaultValue()
.withDescription("max cache items");
- public static final Option<String> FLUSH_TIMEOUTMILLIS =
+ public static final Option<Long> FLUSH_TIMEOUTMILLIS =
Options.key("flushTimeoutMillis")
- .stringType()
+ .longType()
.noDefaultValue()
.withDescription("flush timeout millis");
- public static final Option<String> MAX_QUEUESIZE =
+ public static final Option<Integer> MAX_QUEUESIZE =
Options.key("maxQueueSize")
- .stringType()
+ .intType()
.noDefaultValue()
.withDescription("flush queue size");
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index 1298715633..ad52f90dc6 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -17,66 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
-import
org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnectorException;
-
-import com.google.auto.service.AutoService;
+import
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
import java.io.IOException;
import java.util.Optional;
/** @description: SentrySink class */
-@AutoService(SeaTunnelSink.class)
public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private SeaTunnelRowType seaTunnelRowType;
- private Config pluginConfig;
-
- @Override
- public String getPluginName() {
- return SentryConfig.SENTRY;
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- if (!pluginConfig.hasPath(SentryConfig.DSN.key())) {
- throw new SentryConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(),
- PluginType.SINK,
- String.format("Config must include column : %s",
SentryConfig.DSN)));
- }
+ private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
+ public SentrySink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return SentrySinkOptions.SENTRY;
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
+ return new SentrySinkWriter(pluginConfig);
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
index 5dc0f169c0..4b288d34f1 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
@@ -18,9 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
import com.google.auto.service.AutoService;
@@ -28,21 +30,26 @@ import com.google.auto.service.AutoService;
public class SentrySinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return SentryConfig.SENTRY;
+ return SentrySinkOptions.SENTRY;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(SentryConfig.DSN)
+ .required(SentrySinkOptions.DSN)
.optional(
- SentryConfig.ENV,
- SentryConfig.CACHE_DIRPATH,
- SentryConfig.ENABLE_EXTERNAL_CONFIGURATION,
- SentryConfig.FLUSH_TIMEOUTMILLIS,
- SentryConfig.MAX_CACHEITEMS,
- SentryConfig.MAX_QUEUESIZE,
- SentryConfig.RELEASE)
+ SentrySinkOptions.ENV,
+ SentrySinkOptions.CACHE_DIRPATH,
+ SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION,
+ SentrySinkOptions.FLUSH_TIMEOUTMILLIS,
+ SentrySinkOptions.MAX_CACHEITEMS,
+ SentrySinkOptions.MAX_QUEUESIZE,
+ SentrySinkOptions.RELEASE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new SentrySink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
index 37f01e8bd4..e01969e0be 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
@@ -17,12 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
+import
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
import io.sentry.Sentry;
import io.sentry.SentryOptions;
@@ -31,33 +29,31 @@ import java.io.IOException;
/** @description: SentrySinkWriter class */
public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private SeaTunnelRowType seaTunnelRowType;
- public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType, Config
pluginConfig) {
+ public SentrySinkWriter(ReadonlyConfig pluginConfig) {
SentryOptions options = new SentryOptions();
- options.setDsn(pluginConfig.getString(SentryConfig.DSN.key()));
- if (pluginConfig.hasPath(SentryConfig.ENV.key())) {
-
options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key()));
+ options.setDsn(pluginConfig.get(SentrySinkOptions.DSN));
+ if (pluginConfig.getOptional(SentrySinkOptions.ENV).isPresent()) {
+ options.setEnvironment(pluginConfig.get(SentrySinkOptions.ENV));
}
- if (pluginConfig.hasPath(SentryConfig.RELEASE.key())) {
-
options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key()));
+ if (pluginConfig.getOptional(SentrySinkOptions.RELEASE).isPresent()) {
+ options.setRelease(pluginConfig.get(SentrySinkOptions.RELEASE));
}
- if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())) {
-
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key()));
+ if
(pluginConfig.getOptional(SentrySinkOptions.CACHE_DIRPATH).isPresent()) {
+
options.setCacheDirPath(pluginConfig.get(SentrySinkOptions.CACHE_DIRPATH));
}
- if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())) {
-
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key()));
+ if
(pluginConfig.getOptional(SentrySinkOptions.MAX_CACHEITEMS).isPresent()) {
+
options.setMaxCacheItems(pluginConfig.get(SentrySinkOptions.MAX_CACHEITEMS));
}
- if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())) {
-
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key()));
+ if
(pluginConfig.getOptional(SentrySinkOptions.MAX_QUEUESIZE).isPresent()) {
+
options.setMaxQueueSize(pluginConfig.get(SentrySinkOptions.MAX_QUEUESIZE));
}
- if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())) {
- options.setFlushTimeoutMillis(
-
pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key()));
+ if
(pluginConfig.getOptional(SentrySinkOptions.FLUSH_TIMEOUTMILLIS).isPresent()) {
+
options.setFlushTimeoutMillis(pluginConfig.get(SentrySinkOptions.FLUSH_TIMEOUTMILLIS));
}
- if
(pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())) {
+ if
(pluginConfig.getOptional(SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION).isPresent())
{
options.setEnableExternalConfiguration(
-
pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key()));
+
pluginConfig.get(SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION));
}
Sentry.init(options);
}