This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 0674f10a5 [Feature] Add SaveMode For StarRocks (#4217)
0674f10a5 is described below
commit 0674f10a5300e9c337b1b979fca710799f10fe6a
Author: Hisoka <[email protected]>
AuthorDate: Sat Feb 25 12:14:41 2023 +0800
[Feature] Add SaveMode For StarRocks (#4217)
---
.../seatunnel/starrocks/catalog/StarRocksCatalog.java | 2 +-
.../engine/core/parse/MultipleTableJobConfigParser.java | 13 ++++++++++++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 46a6a423f..98379d629 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -326,7 +326,7 @@ public class StarRocksCatalog implements Catalog {
conn.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(
- String.format("Failed listing database in catalog %s",
catalogName), e);
+ String.format("Failed create table in catalog %s, sql :[%s]",
catalogName, sql), e);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 6916cf870..312677112 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -21,13 +21,16 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.env.ParsingMode;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -261,13 +264,21 @@ public class MultipleTableJobConfigParser {
sink,
factoryUrls,
new SinkConfig(catalogTable.getTableId().getTableName()));
-
+ handleSaveMode(sink);
sinkAction.setParallelism(leftAction.getParallelism());
sinkActions.add(sinkAction);
}
return sinkActions;
}
+ private static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+ if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
+ DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ saveModeSink.handleSaveMode(dataSaveMode);
+ }
+ }
+
private static String getFactoryId(ReadonlyConfig readonlyConfig) {
return
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
}