This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0fdf45f94d [Feature] LocalFile sink support multiple table (#5931)
0fdf45f94d is described below

commit 0fdf45f94d2d3720e4c2bd1ad3498d270f1aaade
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 10:28:38 2023 +0800

    [Feature] LocalFile sink support multiple table (#5931)
---
 docs/en/connector-v2/sink/LocalFile.md             |  14 ++-
 .../api/configuration/util/ConfigValidator.java    |  46 +++++---
 .../api/table/catalog/CatalogTableUtil.java        |   4 +-
 .../configuration/util/ConfigValidatorTest.java    |   4 +-
 .../seatunnel/file/local/sink/LocalFileSink.java   |  91 ++++++++++++++--
 .../file/local/sink/LocalFileSinkFactory.java      |  83 +++++++++++++-
 .../local/sink/writter/LocalFileSinkWriter.java    |  51 +++++++++
 .../file/local/LocalFileWithMultipleTableIT.java   |   9 ++
 .../fake_to_local_file_with_multiple_table.conf    | 121 +++++++++++++++++++++
 .../engine/client/LogicalDagGeneratorTest.java     |   2 +-
 .../client/MultipleTableJobConfigParserTest.java   |  11 +-
 .../dag/execution/ExecutionPlanGenerator.java      |   3 +-
 12 files changed, 400 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index d7a183a4ae..e9d7985051 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ### path [string]
 
-The target dir path is required.
+The target dir path is required, you can inject the upstream CatalogTable into 
the path by using: `${database_name}`, `${table_name}` and `${schema_name}`.
 
 ### custom_filename [boolean]
 
@@ -237,6 +237,18 @@ LocalFile {
 
 ```
 
+For extract source metadata from upstream, you can use `${database_name}`, 
`${table_name}` and `${schema_name}` in the path.
+
+```bash
+
+LocalFile {
+    path = "/tmp/hive/warehouse/${table_name}"
+    file_format_type = "parquet"
+    sink_columns = ["name","age"]
+}
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
index f82632b0e1..884e82034d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
@@ -45,14 +45,18 @@ public class ConfigValidator {
         List<RequiredOption> requiredOptions = rule.getRequiredOptions();
         for (RequiredOption requiredOption : requiredOptions) {
             validate(requiredOption);
-            requiredOption
-                    .getOptions()
-                    .forEach(
-                            option -> {
-                                if 
(SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
-                                    validateSingleChoice(option);
-                                }
-                            });
+
+            for (Option<?> option : requiredOption.getOptions()) {
+                if 
(SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
+                    // is required option and not match condition, skip 
validate
+                    if (isConditionOption(requiredOption)
+                            && !matchCondition(
+                                    
(RequiredOption.ConditionalRequiredOptions) requiredOption)) {
+                        continue;
+                    }
+                    validateSingleChoice(option);
+                }
+            }
         }
 
         for (Option option : rule.getOptionalOptions()) {
@@ -74,15 +78,15 @@ public class ConfigValidator {
         Object o = singleChoiceOption.defaultValue();
         if (o != null && !optionValues.contains(o)) {
             throw new OptionValidationException(
-                    "These options(%s) are SingleChoiceOption, the 
defaultValue(%s) must be one of the optionValues.",
-                    getOptionKeys(Arrays.asList(singleChoiceOption)), o);
+                    "These options(%s) are SingleChoiceOption, the 
defaultValue(%s) must be one of the optionValues(%s).",
+                    getOptionKeys(Arrays.asList(singleChoiceOption)), o, 
optionValues);
         }
 
         Object value = config.get(option);
         if (value != null && !optionValues.contains(value)) {
             throw new OptionValidationException(
-                    "These options(%s) are SingleChoiceOption, the value(%s) 
must be one of the optionValues.",
-                    getOptionKeys(Arrays.asList(singleChoiceOption)), value);
+                    "These options(%s) are SingleChoiceOption, the value(%s) 
must be one of the optionValues(%s).",
+                    getOptionKeys(Arrays.asList(singleChoiceOption)), value, 
optionValues);
         }
     }
 
@@ -99,7 +103,7 @@ public class ConfigValidator {
             validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
             return;
         }
-        if (requiredOption instanceof 
RequiredOption.ConditionalRequiredOptions) {
+        if (isConditionOption(requiredOption)) {
             validate((RequiredOption.ConditionalRequiredOptions) 
requiredOption);
             return;
         }
@@ -181,8 +185,7 @@ public class ConfigValidator {
     }
 
     void validate(RequiredOption.ConditionalRequiredOptions 
conditionalRequiredOptions) {
-        Expression expression = conditionalRequiredOptions.getExpression();
-        boolean match = validate(expression);
+        boolean match = matchCondition(conditionalRequiredOptions);
         if (!match) {
             return;
         }
@@ -193,7 +196,8 @@ public class ConfigValidator {
         }
         throw new OptionValidationException(
                 "There are unconfigured options, the options(%s) are required 
because [%s] is true.",
-                getOptionKeys(absentOptions), expression.toString());
+                getOptionKeys(absentOptions),
+                conditionalRequiredOptions.getExpression().toString());
     }
 
     private boolean validate(Expression expression) {
@@ -222,4 +226,14 @@ public class ConfigValidator {
             return match || validate(condition.getNext());
         }
     }
+
+    private boolean isConditionOption(RequiredOption requiredOption) {
+        return requiredOption instanceof 
RequiredOption.ConditionalRequiredOptions;
+    }
+
+    private boolean matchCondition(
+            RequiredOption.ConditionalRequiredOptions 
conditionalRequiredOptions) {
+        Expression expression = conditionalRequiredOptions.getExpression();
+        return validate(expression);
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 99c376d330..6b8d19ea71 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -213,7 +213,9 @@ public class CatalogTableUtil implements Serializable {
                             schemaConfig.get(
                                     
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
         } else {
-            tablePath = TablePath.EMPTY;
+            Optional<String> resultTableNameOptional =
+                    
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
+            tablePath = 
resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
         }
 
         return CatalogTable.of(
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
index 7b9a3b39df..18a176b78c 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
@@ -275,7 +275,7 @@ public class ConfigValidatorTest {
         config.put(SINGLE_CHOICE_TEST.key(), "A");
         Executable executable = () -> validate(config, optionRule);
         assertEquals(
-                "ErrorCode:[API-02], ErrorDescription:[Option item validate 
failed] - These options('single_choice_test') are SingleChoiceOption, the 
defaultValue(M) must be one of the optionValues.",
+                "ErrorCode:[API-02], ErrorDescription:[Option item validate 
failed] - These options('single_choice_test') are SingleChoiceOption, the 
defaultValue(M) must be one of the optionValues([A, B, C]).",
                 assertThrows(OptionValidationException.class, 
executable).getMessage());
     }
 
@@ -290,7 +290,7 @@ public class ConfigValidatorTest {
         config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N");
         executable = () -> validate(config, optionRule);
         assertEquals(
-                "ErrorCode:[API-02], ErrorDescription:[Option item validate 
failed] - These options('single_choice_test') are SingleChoiceOption, the 
value(N) must be one of the optionValues.",
+                "ErrorCode:[API-02], ErrorDescription:[Option item validate 
failed] - These options('single_choice_test') are SingleChoiceOption, the 
value(N) must be one of the optionValues([A, B, C]).",
                 assertThrows(OptionValidationException.class, 
executable).getMessage());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 4d8037ef5f..3222dcd7a7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -17,27 +17,94 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+
+import java.util.List;
+import java.util.Optional;
 
-import com.google.auto.service.AutoService;
+public class LocalFileSink
+        implements SeaTunnelSink<
+                        SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>,
+                SupportMultiTableSink {
 
-@AutoService(SeaTunnelSink.class)
-public class LocalFileSink extends BaseFileSink {
+    private final HadoopConf hadoopConf;
+    private final FileSystemUtils fileSystemUtils;
+    private final FileSinkConfig fileSinkConfig;
+    private final WriteStrategy writeStrategy;
+    private String jobId;
+
+    public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        this.hadoopConf = new LocalFileHadoopConf();
+        this.fileSinkConfig =
+                new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
+        this.writeStrategy =
+                WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
+        this.fileSystemUtils = new FileSystemUtils(hadoopConf);
+        
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+        this.writeStrategy.setFileSystemUtils(fileSystemUtils);
+    }
 
     @Override
-    public String getPluginName() {
-        return FileSystemType.LOCAL.getFileSystemPluginName();
+    public void setJobContext(JobContext jobContext) {
+        this.jobId = jobContext.getJobId();
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
+            SinkWriter.Context context, List<FileSinkState> states) {
+        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>>
+            createAggregatedCommitter() {
+        return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(
+            SinkWriter.Context context) {
+        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-        hadoopConf = new LocalFileHadoopConf();
+    public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public String getPluginName() {
+        return FileSystemType.LOCAL.getFileSystemPluginName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index b6f5dd5076..d9232f4ddc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -17,17 +17,32 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
-public class LocalFileSinkFactory implements TableSinkFactory {
+public class LocalFileSinkFactory
+        implements TableSinkFactory<
+                SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo> {
     @Override
     public String factoryIdentifier() {
         return FileSystemType.LOCAL.getFileSystemPluginName();
@@ -82,4 +97,70 @@ public class LocalFileSinkFactory implements 
TableSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .build();
     }
+
+    @Override
+    public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>
+            createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+
+        ReadonlyConfig finalReadonlyConfig =
+                generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+        return () -> new LocalFileSink(finalReadonlyConfig, catalogTable);
+    }
+
+    // replace the table name in sink config's path
+    private ReadonlyConfig generateCurrentReadonlyConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+        // Copy the config to avoid modifying the original config
+        Config config = readonlyConfig.toConfig();
+
+        if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) {
+            String replacedPath =
+                    replaceCatalogTableInPath(
+                            config.getString(BaseSinkConfig.FILE_PATH.key()), 
catalogTable);
+            config =
+                    config.withValue(
+                            BaseSinkConfig.FILE_PATH.key(),
+                            ConfigValueFactory.fromAnyRef(replacedPath));
+        }
+
+        if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) {
+            String replacedPath =
+                    replaceCatalogTableInPath(
+                            config.getString(BaseSinkConfig.TMP_PATH.key()), 
catalogTable);
+            config =
+                    config.withValue(
+                            BaseSinkConfig.TMP_PATH.key(),
+                            ConfigValueFactory.fromAnyRef(replacedPath));
+        }
+
+        return ReadonlyConfig.fromConfig(config);
+    }
+
+    private String replaceCatalogTableInPath(String originString, CatalogTable 
catalogTable) {
+        String path = originString;
+        TableIdentifier tableIdentifier = catalogTable.getTableId();
+        if (tableIdentifier != null) {
+            if (tableIdentifier.getDatabaseName() != null) {
+                path =
+                        path.replace(
+                                
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
+                                tableIdentifier.getDatabaseName());
+            }
+            if (tableIdentifier.getSchemaName() != null) {
+                path =
+                        path.replace(
+                                
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+                                tableIdentifier.getSchemaName());
+            }
+            if (tableIdentifier.getTableName() != null) {
+                path =
+                        path.replace(
+                                SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+                                tableIdentifier.getTableName());
+            }
+        }
+        return path;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
new file mode 100644
index 0000000000..88de32f820
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class LocalFileSinkWriter extends BaseFileSinkWriter
+        implements SupportMultiTableSinkWriter<WriteStrategy> {
+
+    public LocalFileSinkWriter(
+            WriteStrategy writeStrategy,
+            HadoopConf hadoopConf,
+            Context context,
+            String jobId,
+            List<FileSinkState> fileSinkStates) {
+        // todo: do we need to set writeStrategy as share resource? then how 
to deal with the pre
+        // fileSinkStates?
+        super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
+    }
+
+    public LocalFileSinkWriter(
+            WriteStrategy writeStrategy,
+            HadoopConf hadoopConf,
+            SinkWriter.Context context,
+            String jobId) {
+        this(writeStrategy, hadoopConf, context, jobId, 
Collections.emptyList());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
index 10d1a63429..35f29e635f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -64,8 +64,17 @@ public class LocalFileWithMultipleTableIT extends 
TestSuiteBase {
                         "/text/e2e.txt",
                         
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
                         container);
+
+                container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
             };
 
+    @TestTemplate
+    public void testFakeToLocalFileInMultipleTableMode_text(TestContainer 
testContainer)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(testContainer);
+        helper.execute("/text/fake_to_local_file_with_multiple_table.conf");
+    }
+
     @TestTemplate
     public void 
testLocalFileReadAndWriteInMultipleTableMode_excel(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
new file mode 100644
index 0000000000..d75c756ba6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_with_multiple_table.conf
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "fake1"
+          fields {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+            c_row = {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+            }
+          }
+        }
+       },
+       {
+       schema = {
+         table = "fake2"
+         fields {
+           c_map = "map<string, string>"
+           c_array = "array<int>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_bytes = bytes
+           c_date = date
+           c_decimal = "decimal(38, 18)"
+           c_timestamp = timestamp
+           c_row = {
+             c_map = "map<string, string>"
+             c_array = "array<int>"
+             c_string = string
+             c_boolean = boolean
+             c_tinyint = tinyint
+             c_smallint = smallint
+             c_int = int
+             c_bigint = bigint
+             c_float = float
+             c_double = double
+             c_bytes = bytes
+             c_date = date
+             c_decimal = "decimal(38, 18)"
+             c_timestamp = timestamp
+           }
+         }
+       }
+      }
+    ]
+  }
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/fake_empty/text/${table_name}"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "lzo"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 962e6aab5b..fc9f2cb72f 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
-                
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"}]}";
+                
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-fake(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
         Assertions.assertEquals(result, logicalDagJson.toString());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 92518ae1b1..083e503d8b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -54,7 +54,7 @@ public class MultipleTableJobConfigParserTest {
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
-        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-MultiTableSink", 
actions.get(0).getName());
         Assertions.assertEquals(1, actions.get(0).getUpstream().size());
         Assertions.assertEquals(
                 "Source[0]-FakeSource", 
actions.get(0).getUpstream().get(0).getName());
@@ -75,7 +75,7 @@ public class MultipleTableJobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
 
-        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-fake", 
actions.get(0).getName());
         Assertions.assertEquals(2, actions.get(0).getUpstream().size());
 
         String[] expected = {"Source[0]-FakeSource", "Source[1]-FakeSource"};
@@ -106,8 +106,11 @@ public class MultipleTableJobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(2, actions.size());
 
-        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
-        Assertions.assertEquals("Sink[1]-LocalFile", actions.get(1).getName());
+        // This is union sink
+        Assertions.assertEquals("Sink[0]-LocalFile-fake", 
actions.get(0).getName());
+
+        // This is multiple table sink
+        Assertions.assertEquals("Sink[1]-LocalFile-MultiTableSink", 
actions.get(1).getName());
     }
 
     @Test
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 955c694e3c..45795cab5c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -459,7 +459,8 @@ public class ExecutionPlanGenerator {
                 actionCount++;
             }
         }
-        checkArgument(actionNames.size() == actionCount, "Action name is 
duplicated");
+        checkArgument(
+                actionNames.size() == actionCount, "Action name is duplicated: 
" + actionNames);
 
         return pipelines;
     }

Reply via email to