This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a2f3fa915 [improve] sentry options (#9261)
4a2f3fa915 is described below

commit 4a2f3fa91590db9c1f0b3ca081d466cb54b24dae
Author: Jarvis <[email protected]>
AuthorDate: Mon May 12 09:54:17 2025 +0800

    [improve] sentry options (#9261)
    
    Co-authored-by: hailin0 <[email protected]>
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  1 -
 .../{SentryConfig.java => SentrySinkOptions.java}  | 18 ++++-----
 .../seatunnel/sentry/sink/SentrySink.java          | 45 +++++-----------------
 .../seatunnel/sentry/sink/SentrySinkFactory.java   | 27 ++++++++-----
 .../seatunnel/sentry/sink/SentrySinkWriter.java    | 40 +++++++++----------
 5 files changed, 54 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index b181393b07..638218c4b9 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PulsarSinkOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
-        whiteList.add("SentrySinkOptions");
         whiteList.add("RocketMqSinkOptions");
         whiteList.add("SocketSinkOptions");
         whiteList.add("SelectDBSinkOptions");
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
similarity index 84%
rename from 
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
rename to 
seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
index 886fd49bb7..9cfeb97b59 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentrySinkOptions.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.sentry.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class SentryConfig {
+public class SentrySinkOptions {
 
     public static final String SENTRY = "Sentry";
 
@@ -35,24 +35,24 @@ public class SentryConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("sentry cache dir path");
-    public static final Option<String> ENABLE_EXTERNAL_CONFIGURATION =
+    public static final Option<Boolean> ENABLE_EXTERNAL_CONFIGURATION =
             Options.key("enableExternalConfiguration")
-                    .stringType()
+                    .booleanType()
                     .noDefaultValue()
                     .withDescription("enable external configuration");
-    public static final Option<String> MAX_CACHEITEMS =
+    public static final Option<Integer> MAX_CACHEITEMS =
             Options.key("maxCacheItems")
-                    .stringType()
+                    .intType()
                     .noDefaultValue()
                     .withDescription("max cache items");
-    public static final Option<String> FLUSH_TIMEOUTMILLIS =
+    public static final Option<Long> FLUSH_TIMEOUTMILLIS =
             Options.key("flushTimeoutMillis")
-                    .stringType()
+                    .longType()
                     .noDefaultValue()
                     .withDescription("flush timeout millis");
-    public static final Option<String> MAX_QUEUESIZE =
+    public static final Option<Integer> MAX_QUEUESIZE =
             Options.key("maxQueueSize")
-                    .stringType()
+                    .intType()
                     .noDefaultValue()
                     .withDescription("flush queue size");
 }
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index 1298715633..ad52f90dc6 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -17,66 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnectorException;
-
-import com.google.auto.service.AutoService;
+import 
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
 
 import java.io.IOException;
 import java.util.Optional;
 
 /** @description: SentrySink class */
-@AutoService(SeaTunnelSink.class)
 public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType seaTunnelRowType;
-    private Config pluginConfig;
-
-    @Override
-    public String getPluginName() {
-        return SentryConfig.SENTRY;
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        if (!pluginConfig.hasPath(SentryConfig.DSN.key())) {
-            throw new SentryConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(),
-                            PluginType.SINK,
-                            String.format("Config must include column : %s", 
SentryConfig.DSN)));
-        }
+    private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
 
+    public SentrySink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
         this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return SentrySinkOptions.SENTRY;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
+        return new SentrySinkWriter(pluginConfig);
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
index 5dc0f169c0..4b288d34f1 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java
@@ -18,9 +18,11 @@
 package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -28,21 +30,26 @@ import com.google.auto.service.AutoService;
 public class SentrySinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return SentryConfig.SENTRY;
+        return SentrySinkOptions.SENTRY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(SentryConfig.DSN)
+                .required(SentrySinkOptions.DSN)
                 .optional(
-                        SentryConfig.ENV,
-                        SentryConfig.CACHE_DIRPATH,
-                        SentryConfig.ENABLE_EXTERNAL_CONFIGURATION,
-                        SentryConfig.FLUSH_TIMEOUTMILLIS,
-                        SentryConfig.MAX_CACHEITEMS,
-                        SentryConfig.MAX_QUEUESIZE,
-                        SentryConfig.RELEASE)
+                        SentrySinkOptions.ENV,
+                        SentrySinkOptions.CACHE_DIRPATH,
+                        SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION,
+                        SentrySinkOptions.FLUSH_TIMEOUTMILLIS,
+                        SentrySinkOptions.MAX_CACHEITEMS,
+                        SentrySinkOptions.MAX_QUEUESIZE,
+                        SentrySinkOptions.RELEASE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new SentrySink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
index 37f01e8bd4..e01969e0be 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
@@ -17,12 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.sentry.config.SentrySinkOptions;
 
 import io.sentry.Sentry;
 import io.sentry.SentryOptions;
@@ -31,33 +29,31 @@ import java.io.IOException;
 
 /** @description: SentrySinkWriter class */
 public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
-    private SeaTunnelRowType seaTunnelRowType;
 
-    public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType, Config 
pluginConfig) {
+    public SentrySinkWriter(ReadonlyConfig pluginConfig) {
         SentryOptions options = new SentryOptions();
-        options.setDsn(pluginConfig.getString(SentryConfig.DSN.key()));
-        if (pluginConfig.hasPath(SentryConfig.ENV.key())) {
-            
options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key()));
+        options.setDsn(pluginConfig.get(SentrySinkOptions.DSN));
+        if (pluginConfig.getOptional(SentrySinkOptions.ENV).isPresent()) {
+            options.setEnvironment(pluginConfig.get(SentrySinkOptions.ENV));
         }
-        if (pluginConfig.hasPath(SentryConfig.RELEASE.key())) {
-            
options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key()));
+        if (pluginConfig.getOptional(SentrySinkOptions.RELEASE).isPresent()) {
+            options.setRelease(pluginConfig.get(SentrySinkOptions.RELEASE));
         }
-        if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())) {
-            
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key()));
+        if 
(pluginConfig.getOptional(SentrySinkOptions.CACHE_DIRPATH).isPresent()) {
+            
options.setCacheDirPath(pluginConfig.get(SentrySinkOptions.CACHE_DIRPATH));
         }
-        if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())) {
-            
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key()));
+        if 
(pluginConfig.getOptional(SentrySinkOptions.MAX_CACHEITEMS).isPresent()) {
+            
options.setMaxCacheItems(pluginConfig.get(SentrySinkOptions.MAX_CACHEITEMS));
         }
-        if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())) {
-            
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key()));
+        if 
(pluginConfig.getOptional(SentrySinkOptions.MAX_QUEUESIZE).isPresent()) {
+            
options.setMaxQueueSize(pluginConfig.get(SentrySinkOptions.MAX_QUEUESIZE));
         }
-        if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())) {
-            options.setFlushTimeoutMillis(
-                    
pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key()));
+        if 
(pluginConfig.getOptional(SentrySinkOptions.FLUSH_TIMEOUTMILLIS).isPresent()) {
+            
options.setFlushTimeoutMillis(pluginConfig.get(SentrySinkOptions.FLUSH_TIMEOUTMILLIS));
         }
-        if 
(pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())) {
+        if 
(pluginConfig.getOptional(SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION).isPresent())
 {
             options.setEnableExternalConfiguration(
-                    
pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key()));
+                    
pluginConfig.get(SentrySinkOptions.ENABLE_EXTERNAL_CONFIGURATION));
         }
         Sentry.init(options);
     }

Reply via email to