This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 0674f10a5 [Feature] Add SaveMode For StarRocks (#4217)
0674f10a5 is described below

commit 0674f10a5300e9c337b1b979fca710799f10fe6a
Author: Hisoka <[email protected]>
AuthorDate: Sat Feb 25 12:14:41 2023 +0800

    [Feature] Add SaveMode For StarRocks (#4217)
---
 .../seatunnel/starrocks/catalog/StarRocksCatalog.java       |  2 +-
 .../engine/core/parse/MultipleTableJobConfigParser.java     | 13 ++++++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 46a6a423f..98379d629 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -326,7 +326,7 @@ public class StarRocksCatalog implements Catalog {
             conn.createStatement().execute(sql);
         } catch (Exception e) {
             throw new CatalogException(
-                String.format("Failed listing database in catalog %s", 
catalogName), e);
+                String.format("Failed create table in catalog %s, sql :[%s]", 
catalogName, sql), e);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 6916cf870..312677112 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -21,13 +21,16 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.env.ParsingMode;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -261,13 +264,21 @@ public class MultipleTableJobConfigParser {
                 sink,
                 factoryUrls,
                 new SinkConfig(catalogTable.getTableId().getTableName()));
-
+            handleSaveMode(sink);
             sinkAction.setParallelism(leftAction.getParallelism());
             sinkActions.add(sinkAction);
         }
         return sinkActions;
     }
 
+    private static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+        if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+            SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
+            DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+            saveModeSink.handleSaveMode(dataSaveMode);
+        }
+    }
+
     private static String getFactoryId(ReadonlyConfig readonlyConfig) {
         return 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
     }

Reply via email to