This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0fdf45f94d [Feature] LocalFile sink support multiple table (#5931)
0fdf45f94d is described below
commit 0fdf45f94d2d3720e4c2bd1ad3498d270f1aaade
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 10:28:38 2023 +0800
[Feature] LocalFile sink support multiple table (#5931)
---
docs/en/connector-v2/sink/LocalFile.md | 14 ++-
.../api/configuration/util/ConfigValidator.java | 46 +++++---
.../api/table/catalog/CatalogTableUtil.java | 4 +-
.../configuration/util/ConfigValidatorTest.java | 4 +-
.../seatunnel/file/local/sink/LocalFileSink.java | 91 ++++++++++++++--
.../file/local/sink/LocalFileSinkFactory.java | 83 +++++++++++++-
.../local/sink/writter/LocalFileSinkWriter.java | 51 +++++++++
.../file/local/LocalFileWithMultipleTableIT.java | 9 ++
.../fake_to_local_file_with_multiple_table.conf | 121 +++++++++++++++++++++
.../engine/client/LogicalDagGeneratorTest.java | 2 +-
.../client/MultipleTableJobConfigParserTest.java | 11 +-
.../dag/execution/ExecutionPlanGenerator.java | 3 +-
12 files changed, 400 insertions(+), 39 deletions(-)
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index d7a183a4ae..e9d7985051 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once`
### path [string]
-The target dir path is required.
+The target dir path is required, you can inject the upstream CatalogTable into
the path by using: `${database_name}`, `${table_name}` and `${schema_name}`.
### custom_filename [boolean]
@@ -237,6 +237,18 @@ LocalFile {
```
+For extract source metadata from upstream, you can use `${database_name}`,
`${table_name}` and `${schema_name}` in the path.
+
+```bash
+
+LocalFile {
+ path = "/tmp/hive/warehouse/${table_name}"
+ file_format_type = "parquet"
+ sink_columns = ["name","age"]
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
index f82632b0e1..884e82034d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
@@ -45,14 +45,18 @@ public class ConfigValidator {
List<RequiredOption> requiredOptions = rule.getRequiredOptions();
for (RequiredOption requiredOption : requiredOptions) {
validate(requiredOption);
- requiredOption
- .getOptions()
- .forEach(
- option -> {
- if
(SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
- validateSingleChoice(option);
- }
- });
+
+ for (Option<?> option : requiredOption.getOptions()) {
+ if
(SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
+ // is required option and not match condition, skip
validate
+ if (isConditionOption(requiredOption)
+ && !matchCondition(
+
(RequiredOption.ConditionalRequiredOptions) requiredOption)) {
+ continue;
+ }
+ validateSingleChoice(option);
+ }
+ }
}
for (Option option : rule.getOptionalOptions()) {
@@ -74,15 +78,15 @@ public class ConfigValidator {
Object o = singleChoiceOption.defaultValue();
if (o != null && !optionValues.contains(o)) {
throw new OptionValidationException(
- "These options(%s) are SingleChoiceOption, the
defaultValue(%s) must be one of the optionValues.",
- getOptionKeys(Arrays.asList(singleChoiceOption)), o);
+ "These options(%s) are SingleChoiceOption, the
defaultValue(%s) must be one of the optionValues(%s).",
+ getOptionKeys(Arrays.asList(singleChoiceOption)), o,
optionValues);
}
Object value = config.get(option);
if (value != null && !optionValues.contains(value)) {
throw new OptionValidationException(
- "These options(%s) are SingleChoiceOption, the value(%s)
must be one of the optionValues.",
- getOptionKeys(Arrays.asList(singleChoiceOption)), value);
+ "These options(%s) are SingleChoiceOption, the value(%s)
must be one of the optionValues(%s).",
+ getOptionKeys(Arrays.asList(singleChoiceOption)), value,
optionValues);
}
}
@@ -99,7 +103,7 @@ public class ConfigValidator {
validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
return;
}
- if (requiredOption instanceof
RequiredOption.ConditionalRequiredOptions) {
+ if (isConditionOption(requiredOption)) {
validate((RequiredOption.ConditionalRequiredOptions)
requiredOption);
return;
}
@@ -181,8 +185,7 @@ public class ConfigValidator {
}
void validate(RequiredOption.ConditionalRequiredOptions
conditionalRequiredOptions) {
- Expression expression = conditionalRequiredOptions.getExpression();
- boolean match = validate(expression);
+ boolean match = matchCondition(conditionalRequiredOptions);
if (!match) {
return;
}
@@ -193,7 +196,8 @@ public class ConfigValidator {
}
throw new OptionValidationException(
"There are unconfigured options, the options(%s) are required
because [%s] is true.",
- getOptionKeys(absentOptions), expression.toString());
+ getOptionKeys(absentOptions),
+ conditionalRequiredOptions.getExpression().toString());
}
private boolean validate(Expression expression) {
@@ -222,4 +226,14 @@ public class ConfigValidator {
return match || validate(condition.getNext());
}
}
+
+ private boolean isConditionOption(RequiredOption requiredOption) {
+ return requiredOption instanceof
RequiredOption.ConditionalRequiredOptions;
+ }
+
+ private boolean matchCondition(
+ RequiredOption.ConditionalRequiredOptions
conditionalRequiredOptions) {
+ Expression expression = conditionalRequiredOptions.getExpression();
+ return validate(expression);
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 99c376d330..6b8d19ea71 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -213,7 +213,9 @@ public class CatalogTableUtil implements Serializable {
schemaConfig.get(
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
} else {
- tablePath = TablePath.EMPTY;
+ Optional<String> resultTableNameOptional =
+
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
+ tablePath =
resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
}
return CatalogTable.of(
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
index 7b9a3b39df..18a176b78c 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
@@ -275,7 +275,7 @@ public class ConfigValidatorTest {
config.put(SINGLE_CHOICE_TEST.key(), "A");
Executable executable = () -> validate(config, optionRule);
assertEquals(
- "ErrorCode:[API-02], ErrorDescription:[Option item validate
failed] - These options('single_choice_test') are SingleChoiceOption, the
defaultValue(M) must be one of the optionValues.",
+ "ErrorCode:[API-02], ErrorDescription:[Option item validate
failed] - These options('single_choice_test') are SingleChoiceOption, the
defaultValue(M) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class,
executable).getMessage());
}
@@ -290,7 +290,7 @@ public class ConfigValidatorTest {
config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N");
executable = () -> validate(config, optionRule);
assertEquals(
- "ErrorCode:[API-02], ErrorDescription:[Option item validate
failed] - These options('single_choice_test') are SingleChoiceOption, the
value(N) must be one of the optionValues.",
+ "ErrorCode:[API-02], ErrorDescription:[Option item validate
failed] - These options('single_choice_test') are SingleChoiceOption, the
value(N) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class,
executable).getMessage());
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 4d8037ef5f..3222dcd7a7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -17,27 +17,94 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+
+import java.util.List;
+import java.util.Optional;
-import com.google.auto.service.AutoService;
+public class LocalFileSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>,
+ SupportMultiTableSink {
-@AutoService(SeaTunnelSink.class)
-public class LocalFileSink extends BaseFileSink {
+ private final HadoopConf hadoopConf;
+ private final FileSystemUtils fileSystemUtils;
+ private final FileSinkConfig fileSinkConfig;
+ private final WriteStrategy writeStrategy;
+ private String jobId;
+
+ public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ this.hadoopConf = new LocalFileHadoopConf();
+ this.fileSinkConfig =
+ new FileSinkConfig(readonlyConfig.toConfig(),
catalogTable.getSeaTunnelRowType());
+ this.writeStrategy =
+ WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
+ this.fileSystemUtils = new FileSystemUtils(hadoopConf);
+
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ this.writeStrategy.setFileSystemUtils(fileSystemUtils);
+ }
@Override
- public String getPluginName() {
- return FileSystemType.LOCAL.getFileSystemPluginName();
+ public void setJobContext(JobContext jobContext) {
+ this.jobId = jobContext.getJobId();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
+ SinkWriter.Context context, List<FileSinkState> states) {
+ return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>>
+ createAggregatedCommitter() {
+ return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(
+ SinkWriter.Context context) {
+ return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- hadoopConf = new LocalFileHadoopConf();
+ public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public String getPluginName() {
+ return FileSystemType.LOCAL.getFileSystemPluginName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index b6f5dd5076..d9232f4ddc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -17,17 +17,32 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
-public class LocalFileSinkFactory implements TableSinkFactory {
+public class LocalFileSinkFactory
+ implements TableSinkFactory<
+ SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo> {
@Override
public String factoryIdentifier() {
return FileSystemType.LOCAL.getFileSystemPluginName();
@@ -82,4 +97,70 @@ public class LocalFileSinkFactory implements
TableSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.build();
}
+
+ @Override
+ public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>
+ createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+
+ ReadonlyConfig finalReadonlyConfig =
+ generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+ return () -> new LocalFileSink(finalReadonlyConfig, catalogTable);
+ }
+
+ // replace the table name in sink config's path
+ private ReadonlyConfig generateCurrentReadonlyConfig(
+ ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+ // Copy the config to avoid modifying the original config
+ Config config = readonlyConfig.toConfig();
+
+ if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) {
+ String replacedPath =
+ replaceCatalogTableInPath(
+ config.getString(BaseSinkConfig.FILE_PATH.key()),
catalogTable);
+ config =
+ config.withValue(
+ BaseSinkConfig.FILE_PATH.key(),
+ ConfigValueFactory.fromAnyRef(replacedPath));
+ }
+
+ if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) {
+ String replacedPath =
+ replaceCatalogTableInPath(
+ config.getString(BaseSinkConfig.TMP_PATH.key()),
catalogTable);
+ config =
+ config.withValue(
+ BaseSinkConfig.TMP_PATH.key(),
+ ConfigValueFactory.fromAnyRef(replacedPath));
+ }
+
+ return ReadonlyConfig.fromConfig(config);
+ }
+
+ private String replaceCatalogTableInPath(String originString, CatalogTable
catalogTable) {
+ String path = originString;
+ TableIdentifier tableIdentifier = catalogTable.getTableId();
+ if (tableIdentifier != null) {
+ if (tableIdentifier.getDatabaseName() != null) {
+ path =
+ path.replace(
+
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
+ tableIdentifier.getDatabaseName());
+ }
+ if (tableIdentifier.getSchemaName() != null) {
+ path =
+ path.replace(
+
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+ tableIdentifier.getSchemaName());
+ }
+ if (tableIdentifier.getTableName() != null) {
+ path =
+ path.replace(
+ SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+ tableIdentifier.getTableName());
+ }
+ }
+ return path;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
new file mode 100644
index 0000000000..88de32f820
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class LocalFileSinkWriter extends BaseFileSinkWriter
+ implements SupportMultiTableSinkWriter<WriteStrategy> {
+
+ public LocalFileSinkWriter(
+ WriteStrategy writeStrategy,
+ HadoopConf hadoopConf,
+ Context context,
+ String jobId,
+ List<FileSinkState> fileSinkStates) {
+ // todo: do we need to set writeStrategy as share resource? then how
to deal with the pre
+ // fileSinkStates?
+ super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
+ }
+
+ public LocalFileSinkWriter(
+ WriteStrategy writeStrategy,
+ HadoopConf hadoopConf,
+ SinkWriter.Context context,
+ String jobId) {
+ this(writeStrategy, hadoopConf, context, jobId,
Collections.emptyList());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
index 10d1a63429..35f29e635f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -64,8 +64,17 @@ public class LocalFileWithMultipleTableIT extends
TestSuiteBase {
"/text/e2e.txt",
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
container);
+
+ container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
};
+ @TestTemplate
+ public void testFakeToLocalFileInMultipleTableMode_text(TestContainer
testContainer)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(testContainer);
+ helper.execute("/text/fake_to_local_file_with_multiple_table.conf");
+ }
+
@TestTemplate
public void
testLocalFileReadAndWriteInMultipleTableMode_excel(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
new file mode 100644
index 0000000000..d75c756ba6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "fake1"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ },
+ {
+ schema = {
+ table = "fake2"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+ ]
+ }
+}
+
+sink {
+ LocalFile {
+ path = "/tmp/fake_empty/text/${table_name}"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "lzo"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 962e6aab5b..fc9f2cb72f 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
-
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"}]}";
+
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-fake(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
Assertions.assertEquals(result, logicalDagJson.toString());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 92518ae1b1..083e503d8b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -54,7 +54,7 @@ public class MultipleTableJobConfigParserTest {
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-MultiTableSink",
actions.get(0).getName());
Assertions.assertEquals(1, actions.get(0).getUpstream().size());
Assertions.assertEquals(
"Source[0]-FakeSource",
actions.get(0).getUpstream().get(0).getName());
@@ -75,7 +75,7 @@ public class MultipleTableJobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-fake",
actions.get(0).getName());
Assertions.assertEquals(2, actions.get(0).getUpstream().size());
String[] expected = {"Source[0]-FakeSource", "Source[1]-FakeSource"};
@@ -106,8 +106,11 @@ public class MultipleTableJobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(2, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
- Assertions.assertEquals("Sink[1]-LocalFile", actions.get(1).getName());
+ // This is union sink
+ Assertions.assertEquals("Sink[0]-LocalFile-fake",
actions.get(0).getName());
+
+ // This is multiple table sink
+ Assertions.assertEquals("Sink[1]-LocalFile-MultiTableSink",
actions.get(1).getName());
}
@Test
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 955c694e3c..45795cab5c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -459,7 +459,8 @@ public class ExecutionPlanGenerator {
actionCount++;
}
}
- checkArgument(actionNames.size() == actionCount, "Action name is
duplicated");
+ checkArgument(
+ actionNames.size() == actionCount, "Action name is duplicated:
" + actionNames);
return pipelines;
}