This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new c2a85ac1500 [FLINK-29114][connector][filesystem] Fix issue of file 
overwriting caused by multiple writes to the same sink table and shared staging 
directory
c2a85ac1500 is described below

commit c2a85ac15003f03682979c617424c74875e19137
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Mar 19 15:51:28 2024 +0800

    [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused 
by multiple writes to the same sink table and shared staging directory
    
    This closes #24511
    
    * Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
    * Move the staging dir configuration into builder for easier testing
    
    ---------
    
    Co-authored-by: Matthias Pohl <matthias.p...@aiven.io>
    (cherry picked from commit 7d0111dfab640f2f590dd710d76de927c86cf83e)
---
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e           |  15 +-
 .../file/table/FileSystemOutputFormat.java         |  55 +++-
 .../connector/file/table/FileSystemTableSink.java  |  15 +-
 .../file/table/FileSystemOutputFormatTest.java     | 283 ++++++++++++---------
 .../flink/connectors/hive/HiveTableSink.java       |   3 +-
 5 files changed, 222 insertions(+), 149 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index c4bf362a8ea..516e36594c2 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -663,17 +663,18 @@ Method 
<org.apache.flink.connector.file.table.DynamicPartitionWriter.write(java.
 Method 
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
 org.apache.flink.connector.file.src.FileSourceSplit)> calls method 
<org.apache.flink.table.utils.PartitionPathUtils.convertStringToInternalValue(java.lang.String,
 org.apache.flink.table.types.DataType)> in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
 org.apache.flink.connector.file.src.FileSourceSplit)> calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
 in (FileInfoExtractorBulkFormat.java:140)
 Method 
<org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
 in (FileSystemCommitter.java:146)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:288)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:289)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:290)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:291)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:292)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:324)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:325)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:326)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:327)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (FileSystemOutputFormat.java:328)
 Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in 
(FileSystemOutputFormat.java:0)
-Method 
<org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData,
 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
 in (FileSystemTableSink.java:566)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FileSystemOutputFormat.java:291)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat.createStagingDirectory(org.apache.flink.core.fs.Path)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.String, [Ljava.lang.Object;)> in (FileSystemOutputFormat.java:109)
+Method 
<org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData,
 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
 in (FileSystemTableSink.java:553)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
 org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> 
calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSystemTableSink.java:208)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
 org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> 
calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSystemTableSink.java:189)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(org.apache.flink.table.connector.ProviderContext,
 org.apache.flink.streaming.api.datastream.DataStream, 
org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> 
calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSystemTableSink.java:233)
-Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.toStagingPath()> 
calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (FileSystemTableSink.java:380)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSource.listPartitions()> 
calls method 
<org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths(org.apache.flink.core.fs.FileSystem,
 org.apache.flink.core.fs.Path, int)> in (FileSystemTableSource.java:328)
 Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> 
has return type <[Lorg.apache.flink.core.fs.Path;> in 
(FileSystemTableSource.java:0)
 Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> 
references method 
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
 in (FileSystemTableSource.java:295)
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index f866b8ec230..3b73e95e497 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -28,12 +29,14 @@ import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -56,7 +59,7 @@ public class FileSystemOutputFormat<T>
     private final TableMetaStoreFactory msFactory;
     private final boolean overwrite;
     private final boolean isToLocal;
-    private final Path tmpPath;
+    private final Path stagingPath;
     private final String[] partitionColumns;
     private final boolean dynamicGrouped;
     private final LinkedHashMap<String, String> staticPartitions;
@@ -74,7 +77,7 @@ public class FileSystemOutputFormat<T>
             TableMetaStoreFactory msFactory,
             boolean overwrite,
             boolean isToLocal,
-            Path tmpPath,
+            Path stagingPath,
             String[] partitionColumns,
             boolean dynamicGrouped,
             LinkedHashMap<String, String> staticPartitions,
@@ -87,7 +90,7 @@ public class FileSystemOutputFormat<T>
         this.msFactory = msFactory;
         this.overwrite = overwrite;
         this.isToLocal = isToLocal;
-        this.tmpPath = tmpPath;
+        this.stagingPath = stagingPath;
         this.partitionColumns = partitionColumns;
         this.dynamicGrouped = dynamicGrouped;
         this.staticPartitions = staticPartitions;
@@ -96,6 +99,22 @@ public class FileSystemOutputFormat<T>
         this.outputFileConfig = outputFileConfig;
         this.identifier = identifier;
         this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+        createStagingDirectory(this.stagingPath);
+    }
+
+    private static void createStagingDirectory(Path stagingPath) {
+        try {
+            final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+            Preconditions.checkState(
+                    !stagingFileSystem.exists(stagingPath),
+                    "Staging dir %s already exists",
+                    stagingPath);
+            stagingFileSystem.mkdirs(stagingPath);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "An IO error occurred while accessing the staging 
FileSystem.", e);
+        }
     }
 
     @Override
@@ -108,7 +127,7 @@ public class FileSystemOutputFormat<T>
                                 Thread.currentThread().getContextClassLoader(),
                                 () -> {
                                     try {
-                                        return 
fsFactory.create(tmpPath.toUri());
+                                        return 
fsFactory.create(stagingPath.toUri());
                                     } catch (IOException e) {
                                         throw new RuntimeException(e);
                                     }
@@ -120,7 +139,7 @@ public class FileSystemOutputFormat<T>
                             fsFactory,
                             msFactory,
                             overwrite,
-                            tmpPath,
+                            stagingPath,
                             partitionColumns.length,
                             isToLocal,
                             identifier,
@@ -141,7 +160,7 @@ public class FileSystemOutputFormat<T>
             throw new TableException("Exception in finalizeGlobal", e);
         } finally {
             try {
-                fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
+                fsFactory.create(stagingPath.toUri()).delete(stagingPath, 
true);
             } catch (IOException ignore) {
             }
         }
@@ -158,7 +177,7 @@ public class FileSystemOutputFormat<T>
             PartitionTempFileManager fileManager =
                     new PartitionTempFileManager(
                             fsFactory,
-                            tmpPath,
+                            stagingPath,
                             context.getTaskNumber(),
                             context.getAttemptNumber(),
                             outputFileConfig);
@@ -203,7 +222,7 @@ public class FileSystemOutputFormat<T>
         private String[] partitionColumns;
         private OutputFormatFactory<T> formatFactory;
         private TableMetaStoreFactory metaStoreFactory;
-        private Path tmpPath;
+        private Path stagingPath;
 
         private LinkedHashMap<String, String> staticPartitions = new 
LinkedHashMap<>();
         private boolean dynamicGrouped = false;
@@ -258,11 +277,23 @@ public class FileSystemOutputFormat<T>
             return this;
         }
 
-        public Builder<T> setTempPath(Path tmpPath) {
-            this.tmpPath = tmpPath;
+        public Builder<T> setPath(Path parentPath) {
+            this.stagingPath = toStagingPath(parentPath);
             return this;
         }
 
+        @VisibleForTesting
+        Builder<T> setStagingPath(Path stagingPath) {
+            this.stagingPath = stagingPath;
+            return this;
+        }
+
+        private Path toStagingPath(Path parentPath) {
+            return new Path(
+                    parentPath,
+                    String.format(".staging_%d_%s", 
System.currentTimeMillis(), UUID.randomUUID()));
+        }
+
         public Builder<T> setPartitionComputer(PartitionComputer<T> computer) {
             this.computer = computer;
             return this;
@@ -288,7 +319,7 @@ public class FileSystemOutputFormat<T>
             checkNotNull(partitionColumns, "partitionColumns should not be 
null");
             checkNotNull(formatFactory, "formatFactory should not be null");
             checkNotNull(metaStoreFactory, "metaStoreFactory should not be 
null");
-            checkNotNull(tmpPath, "tmpPath should not be null");
+            checkNotNull(stagingPath, "stagingPath should not be null");
             checkNotNull(computer, "partitionComputer should not be null");
 
             return new FileSystemOutputFormat<>(
@@ -296,7 +327,7 @@ public class FileSystemOutputFormat<T>
                     metaStoreFactory,
                     overwrite,
                     isToLocal,
-                    tmpPath,
+                    stagingPath,
                     partitionColumns,
                     dynamicGrouped,
                     staticPartitions,
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 0555774b656..1029fecedb5 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -184,7 +184,7 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
                 .setMetaStoreFactory(new EmptyMetaStoreFactory(path))
                 .setOverwrite(overwrite)
                 .setStaticPartitions(staticPartitions)
-                .setTempPath(toStagingPath())
+                .setPath(path)
                 .setOutputFileConfig(
                         OutputFileConfig.builder()
                                 .withPartPrefix("part-" + UUID.randomUUID())
@@ -373,19 +373,6 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
         };
     }
 
-    private Path toStagingPath() {
-        Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
-        try {
-            FileSystem fs = stagingDir.getFileSystem();
-            Preconditions.checkState(
-                    fs.exists(stagingDir) || fs.mkdirs(stagingDir),
-                    "Failed to create staging dir " + stagingDir);
-            return stagingDir;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @SuppressWarnings("unchecked")
     private OutputFormatFactory<RowData> createOutputFormatFactory(Context 
sinkContext) {
         Object writer = createWriter(sinkContext);
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
index 964304d5ae7..85062cf215e 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowUtils;
 
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
 import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -37,24 +39,41 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link FileSystemOutputFormat}. */
 class FileSystemOutputFormatTest {
 
-    @TempDir private java.nio.file.Path tmpPath;
     @TempDir private java.nio.file.Path outputPath;
 
+    @TempDir private java.nio.file.Path stagingBaseDir;
+
     private final TestingFinalizationContext finalizationContext = new 
TestingFinalizationContext();
 
+    private static final Supplier<List<StreamRecord<Row>>> 
DEFAULT_INPUT_SUPPLIER =
+            () ->
+                    Arrays.asList(
+                            new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                            new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                            new StreamRecord<>(Row.of("a2", 2, "p2"), 1L),
+                            new StreamRecord<>(Row.of("a3", 3, "p1"), 1L));
+
+    private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER =
+            () ->
+                    Collections.singletonList(
+                            createFileContent("a1,1,p1", "a2,2,p1", "a2,2,p2", 
"a3,3,p1"));
+
     private static Map<File, String> getFileContentByPath(java.nio.file.Path 
directory)
             throws IOException {
         Map<File, String> contents = new HashMap<>(4);
@@ -70,6 +89,10 @@ class FileSystemOutputFormatTest {
         return contents;
     }
 
+    private static String createFileContent(String... rows) {
+        return Arrays.stream(rows).collect(Collectors.joining("\n", "", "\n"));
+    }
+
     @BeforeEach
     void before() {
         RowUtils.USE_LEGACY_TO_STRING = true;
@@ -83,7 +106,7 @@ class FileSystemOutputFormatTest {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(createSinkFormat(false, false, false, new 
LinkedHashMap<>()))) {
             testHarness.setup();
             testHarness.open();
         }
@@ -91,149 +114,181 @@ class FileSystemOutputFormatTest {
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
 
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonMap(
+                                "c=p1", createFileContent("a1,1", "a2,2", 
"a2,2", "a3,3")));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () ->
+                        ImmutableMap.of(
+                                "c=p1",
+                                createFileContent("a1,1", "a2,2", "a3,3"),
+                                "c=p2",
+                                createFileContent("a2,2")));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () ->
+                        ImmutableMap.of(
+                                "c=p1",
+                                createFileContent("a1,1", "a2,2", "a3,3"),
+                                "c=p2",
+                                createFileContent("a2,2")));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() throws IOException {
+        final Path alreadyExistingStagingDir = new 
Path(outputPath.toFile().getAbsolutePath());
+        
assertThat(alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir))
+                .as("The staging folder should already exist.")
+                .isTrue();
+
+        final FileSystemOutputFormat.Builder<Row> builder =
+                new FileSystemOutputFormat.Builder<Row>()
+                        .setPartitionColumns(new String[0])
+                        .setFormatFactory(TextOutputFormat::new)
+                        .setMetaStoreFactory(
+                                new 
FileSystemCommitterTest.TestMetaStoreFactory(
+                                        new 
Path(outputPath.toFile().getAbsolutePath())))
+                        .setPartitionComputer(
+                                new RowPartitionComputer("default", new 
String[0], new String[0]))
+                        .setStagingPath(alreadyExistingStagingDir);
+
+        assertThatThrownBy(builder::build)
+                .as("Reusing a folder should cause an error.")
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<?> outputSupplier)
+            throws Exception {
+        Object expectedOutput = outputSupplier.get();
+        int expectedFileNum =
+                (partitioned)
+                        ? ((Map<String, String>) expectedOutput).size()
+                        : ((List<String>) expectedOutput).size();
+        FileSystemOutputFormat<Row> outputFormat =
+                createSinkFormat(override, partitioned, dynamicGrouped, 
staticPartitions);
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(outputFormat)) {
             testHarness.setup();
             testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            
assertThat(getFileContentByPath(stagingBaseDir)).hasSize(expectedFileNum);
         }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + 
"a3,3\n");
-        assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        outputFormat.finalizeGlobal(finalizationContext);
+        assertThat(stagingBaseDir).isEmptyDirectory();
+
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {
+            Map<String, String> partitionToContent =
+                    fileToContent.entrySet().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            e -> 
e.getKey().getParentFile().getName(),
+                                            Map.Entry::getValue));
+
+            assertThat(partitionToContent)
+                    .containsExactlyInAnyOrderEntriesOf((Map<String, String>) 
expectedOutput);
+        } else {
+            assertThat(fileToContent.values())
+                    .containsExactlyInAnyOrderElementsOf((List<String>) 
expectedOutput);
+        }
     }
 
-    private OneInputStreamOperatorTestHarness<Row, Object> createSink(
+    private FileSystemOutputFormat<Row> createSinkFormat(
             boolean override,
             boolean partition,
             boolean dynamicGrouped,
-            LinkedHashMap<String, String> staticPartitions,
-            AtomicReference<FileSystemOutputFormat<Row>> sinkRef)
-            throws Exception {
+            LinkedHashMap<String, String> staticPartitions) {
         String[] columnNames = new String[] {"a", "b", "c"};
         String[] partitionColumns = partition ? new String[] {"c"} : new 
String[0];
+        Path path = new Path(outputPath.toString());
+        TableMetaStoreFactory msFactory = new 
FileSystemCommitterTest.TestMetaStoreFactory(path);
+        return new FileSystemOutputFormat.Builder<Row>()
+                .setMetaStoreFactory(msFactory)
+                .setPath(new Path(stagingBaseDir.toString()))
+                .setOverwrite(override)
+                .setPartitionColumns(partitionColumns)
+                .setPartitionComputer(
+                        new RowPartitionComputer("default", columnNames, 
partitionColumns))
+                .setFormatFactory(TextOutputFormat::new)
+                .setDynamicGrouped(dynamicGrouped)
+                .setStaticPartitions(staticPartitions)
+                .build();
+    }
 
-        TableMetaStoreFactory msFactory =
-                new FileSystemCommitterTest.TestMetaStoreFactory(new 
Path(outputPath.toString()));
-        FileSystemOutputFormat<Row> sink =
-                new FileSystemOutputFormat.Builder<Row>()
-                        .setMetaStoreFactory(msFactory)
-                        .setTempPath(new Path(tmpPath.toString()))
-                        .setOverwrite(override)
-                        .setPartitionColumns(partitionColumns)
-                        .setPartitionComputer(
-                                new RowPartitionComputer("default", 
columnNames, partitionColumns))
-                        .setFormatFactory(TextOutputFormat::new)
-                        .setDynamicGrouped(dynamicGrouped)
-                        .setStaticPartitions(staticPartitions)
-                        .build();
-
-        sinkRef.set(sink);
-
+    private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness(
+            FileSystemOutputFormat<Row> outputFormat) throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
-                new StreamSink<>(new OutputFormatSinkFunction<>(sink)),
+                new StreamSink<>(new OutputFormatSinkFunction<>(outputFormat)),
                 // test parallelism
                 3,
                 3,
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 779de538f43..92fed884c5b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -607,8 +607,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         builder.setOverwrite(overwrite);
         builder.setIsToLocal(isToLocal);
         builder.setStaticPartitions(staticPartitionSpec);
-        builder.setTempPath(
-                new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
+        builder.setPath(new org.apache.flink.core.fs.Path(stagingParentDir));
         builder.setOutputFileConfig(fileNaming);
         builder.setIdentifier(identifier);
         builder.setPartitionCommitPolicyFactory(


Reply via email to