This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8c2c5c79a1 [Improve][API] Move catalog open to SaveModeHandler (#7439)
8c2c5c79a1 is described below
commit 8c2c5c79a18cb75a032117d9e28b3d794531de6c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 22 20:13:08 2024 +0800
[Improve][API] Move catalog open to SaveModeHandler (#7439)
---
.../java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 5 +++++
.../src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java | 2 ++
.../java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java | 1 -
.../connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java | 1 -
.../seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java | 1 -
.../apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 1 -
.../seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java | 1 -
.../seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java | 1 -
.../seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java | 1 -
.../seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java | 1 +
.../seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java | 1 +
.../seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java | 1 +
.../seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java | 1 +
.../org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java | 5 ++++-
.../apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java | 3 +++
.../seatunnel/engine/core/parse/MultipleTableJobConfigParser.java | 1 +
.../java/org/apache/seatunnel/engine/server/master/JobMaster.java | 1 +
17 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 051068dba0..fc07b99000 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -59,6 +59,11 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
customSql);
}
+ @Override
+ public void open() {
+ catalog.open();
+ }
+
@Override
public void handleSchemaSaveMode() {
switch (schemaSaveMode) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index e75c2215dd..3eddaf0514 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
public interface SaveModeHandler extends AutoCloseable {
+ void open();
+
void handleSchemaSaveMode();
void handleDataSaveMode();
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index c449dda027..c746fea00c 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -125,7 +125,6 @@ public class DorisSink
}
Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
- catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
config.get(DorisOptions.SCHEMA_SAVE_MODE),
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index dee47bfd73..fed65733e0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -91,7 +91,6 @@ public class ElasticsearchSink
DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
TablePath tablePath = TablePath.of("",
catalogTable.getTableId().getTableName());
- catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 008ab799b9..65bccbdb89 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -130,7 +130,6 @@ public class IcebergSink
}
Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(),
readonlyConfig);
- catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
config.getSchemaSaveMode(),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 1ec9ab8883..2ccfba19f2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -198,7 +198,6 @@ public class JdbcSink
if (catalogOptional.isPresent()) {
try {
Catalog catalog = catalogOptional.get();
- catalog.open();
FieldIdeEnum fieldIdeEnumEnum =
config.get(JdbcOptions.FIELD_IDE);
String fieldIde =
fieldIdeEnumEnum == null
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
index c5b1b82bcc..2015be1973 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -103,7 +103,6 @@ public class MilvusSink
SchemaSaveMode schemaSaveMode =
config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
DataSaveMode dataSaveMode =
config.get(MilvusSinkConfig.DATA_SAVE_MODE);
- catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 23651994ad..fbf04a5038 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -144,7 +144,6 @@ public class PaimonSink
}
org.apache.seatunnel.api.table.catalog.Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(),
readonlyConfig);
- catalog.open();
return Optional.of(
new PaimonSaveModeHandler(
this,
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index b9040f72d4..35c9ed9e37 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -79,7 +79,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
sinkConfig.getPassword(),
sinkConfig.getJdbcUrl(),
sinkConfig.getSaveModeCreateTemplate());
- catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode,
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 51586beaf0..f7ec6bd309 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -130,6 +130,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c713593821..b4f192bc61 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -131,6 +131,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 0f3fc75096..20ca02b616 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -122,6 +122,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 29ff90cfb8..0cca2f8d68 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -122,6 +122,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index f8550a615a..4ece65fda8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.e2e.connector.doris;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -245,7 +246,9 @@ public class DorisCatalogIT extends AbstractDorisIT {
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
SupportSaveMode sink = (SupportSaveMode)
dorisSinkFactory.createSink(context).createSink();
- sink.getSaveModeHandler().get().handleSaveMode();
+ SaveModeHandler handler = sink.getSaveModeHandler().get();
+ handler.open();
+ handler.handleSaveMode();
CatalogTable createdTable = catalog.getTable(TablePath.of(fullName));
Assertions.assertEquals(
upstreamTable.getTableSchema().getColumns().size(),
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
index e2c28bd447..61084819a2 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -42,6 +42,9 @@ public class InMemorySaveModeHandler implements
SaveModeHandler {
this.catalogTable = catalogTable;
}
+ @Override
+ public void open() {}
+
@Override
public void handleSchemaSaveMode() {
log.info("handle schema savemode with table path: {}",
catalogTable.getTablePath());
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d02a76a4c5..cb7f118a6e 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -698,6 +698,7 @@ public class MultipleTableJobConfigParser {
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f521c05492..d85cf607dd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -372,6 +372,7 @@ public class JobMaster {
((SupportSaveMode) sink).getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);