This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new c2a85ac1500 [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory c2a85ac1500 is described below commit c2a85ac15003f03682979c617424c74875e19137 Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Tue Mar 19 15:51:28 2024 +0800 [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory This closes #24511 * Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse * Move the staging dir configuration into builder for easier testing --------- Co-authored-by: Matthias Pohl <matthias.p...@aiven.io> (cherry picked from commit 7d0111dfab640f2f590dd710d76de927c86cf83e) --- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 15 +- .../file/table/FileSystemOutputFormat.java | 55 +++- .../connector/file/table/FileSystemTableSink.java | 15 +- .../file/table/FileSystemOutputFormatTest.java | 283 ++++++++++++--------- .../flink/connectors/hive/HiveTableSink.java | 3 +- 5 files changed, 222 insertions(+), 149 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index c4bf362a8ea..516e36594c2 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -663,17 +663,18 @@ Method <org.apache.flink.connector.file.table.DynamicPartitionWriter.write(java. Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.convertStringToInternalValue(java.lang.String, org.apache.flink.table.types.DataType)> in (FileInfoExtractorBulkFormat.java:156) Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileInfoExtractorBulkFormat.java:140) Method <org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileSystemCommitter.java:146) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:288) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:289) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:290) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:291) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:292) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:324) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:325) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:326) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:327) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:328) Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSystemOutputFormat.java:0) -Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:566) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FileSystemOutputFormat.java:291) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat.createStagingDirectory(org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (FileSystemOutputFormat.java:109) +Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:553) Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSystemTableSink.java:208) Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:189) Method <org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(org.apache.flink.table.connector.ProviderContext, org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:233) -Method <org.apache.flink.connector.file.table.FileSystemTableSink.toStagingPath()> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (FileSystemTableSink.java:380) Method <org.apache.flink.connector.file.table.FileSystemTableSource.listPartitions()> calls method <org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, int)> in (FileSystemTableSource.java:328) Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> has return type <[Lorg.apache.flink.core.fs.Path;> in (FileSystemTableSource.java:0) Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> references method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSource.java:295) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java index f866b8ec230..3b73e95e497 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.io.FinalizeOnMaster; import org.apache.flink.api.common.io.OutputFormat; @@ -28,12 +29,14 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -56,7 +59,7 @@ public class FileSystemOutputFormat<T> private final TableMetaStoreFactory msFactory; private final boolean overwrite; private final boolean isToLocal; - private final Path tmpPath; + private final Path stagingPath; private final String[] partitionColumns; private final boolean dynamicGrouped; private final LinkedHashMap<String, String> staticPartitions; @@ -74,7 +77,7 @@ public class FileSystemOutputFormat<T> TableMetaStoreFactory msFactory, boolean overwrite, boolean isToLocal, - Path tmpPath, + Path stagingPath, String[] partitionColumns, boolean dynamicGrouped, LinkedHashMap<String, String> staticPartitions, @@ -87,7 +90,7 @@ public class FileSystemOutputFormat<T> this.msFactory = msFactory; this.overwrite = overwrite; this.isToLocal = isToLocal; - this.tmpPath = tmpPath; + this.stagingPath = stagingPath; this.partitionColumns = partitionColumns; this.dynamicGrouped = dynamicGrouped; this.staticPartitions = staticPartitions; @@ -96,6 +99,22 @@ public class FileSystemOutputFormat<T> this.outputFileConfig = outputFileConfig; this.identifier = identifier; this.partitionCommitPolicyFactory = partitionCommitPolicyFactory; + + createStagingDirectory(this.stagingPath); + } + + private static void createStagingDirectory(Path stagingPath) { + try { + final FileSystem stagingFileSystem = stagingPath.getFileSystem(); + Preconditions.checkState( + !stagingFileSystem.exists(stagingPath), + "Staging dir %s already exists", + stagingPath); + stagingFileSystem.mkdirs(stagingPath); + } catch (IOException e) { + throw new RuntimeException( + "An IO error occurred while accessing the staging FileSystem.", e); + } } @Override @@ -108,7 +127,7 @@ public class FileSystemOutputFormat<T> Thread.currentThread().getContextClassLoader(), () -> { try { - return fsFactory.create(tmpPath.toUri()); + return fsFactory.create(stagingPath.toUri()); } catch (IOException e) { throw new RuntimeException(e); } @@ -120,7 +139,7 @@ public class FileSystemOutputFormat<T> fsFactory, msFactory, overwrite, - tmpPath, + stagingPath, partitionColumns.length, isToLocal, identifier, @@ -141,7 +160,7 @@ public class FileSystemOutputFormat<T> throw new TableException("Exception in finalizeGlobal", e); } finally { try { - fsFactory.create(tmpPath.toUri()).delete(tmpPath, true); + fsFactory.create(stagingPath.toUri()).delete(stagingPath, true); } catch (IOException ignore) { } } @@ -158,7 +177,7 @@ public class FileSystemOutputFormat<T> PartitionTempFileManager fileManager = new PartitionTempFileManager( fsFactory, - tmpPath, + stagingPath, context.getTaskNumber(), context.getAttemptNumber(), outputFileConfig); @@ -203,7 +222,7 @@ public class FileSystemOutputFormat<T> private String[] partitionColumns; private OutputFormatFactory<T> formatFactory; private TableMetaStoreFactory metaStoreFactory; - private Path tmpPath; + private Path stagingPath; private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>(); private boolean dynamicGrouped = false; @@ -258,11 +277,23 @@ public class FileSystemOutputFormat<T> return this; } - public Builder<T> setTempPath(Path tmpPath) { - this.tmpPath = tmpPath; + public Builder<T> setPath(Path parentPath) { + this.stagingPath = toStagingPath(parentPath); return this; } + @VisibleForTesting + Builder<T> setStagingPath(Path stagingPath) { + this.stagingPath = stagingPath; + return this; + } + + private Path toStagingPath(Path parentPath) { + return new Path( + parentPath, + String.format(".staging_%d_%s", System.currentTimeMillis(), UUID.randomUUID())); + } + public Builder<T> setPartitionComputer(PartitionComputer<T> computer) { this.computer = computer; return this; @@ -288,7 +319,7 @@ public class FileSystemOutputFormat<T> checkNotNull(partitionColumns, "partitionColumns should not be null"); checkNotNull(formatFactory, "formatFactory should not be null"); checkNotNull(metaStoreFactory, "metaStoreFactory should not be null"); - checkNotNull(tmpPath, "tmpPath should not be null"); + checkNotNull(stagingPath, "stagingPath should not be null"); checkNotNull(computer, "partitionComputer should not be null"); return new FileSystemOutputFormat<>( @@ -296,7 +327,7 @@ public class FileSystemOutputFormat<T> metaStoreFactory, overwrite, isToLocal, - tmpPath, + stagingPath, partitionColumns, dynamicGrouped, staticPartitions, diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java index 0555774b656..1029fecedb5 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java @@ -184,7 +184,7 @@ public class FileSystemTableSink extends AbstractFileSystemTable .setMetaStoreFactory(new EmptyMetaStoreFactory(path)) .setOverwrite(overwrite) .setStaticPartitions(staticPartitions) - .setTempPath(toStagingPath()) + .setPath(path) .setOutputFileConfig( OutputFileConfig.builder() .withPartPrefix("part-" + UUID.randomUUID()) @@ -373,19 +373,6 @@ public class FileSystemTableSink extends AbstractFileSystemTable }; } - private Path toStagingPath() { - Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis()); - try { - FileSystem fs = stagingDir.getFileSystem(); - Preconditions.checkState( - fs.exists(stagingDir) || fs.mkdirs(stagingDir), - "Failed to create staging dir " + stagingDir); - return stagingDir; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - @SuppressWarnings("unchecked") private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) { Object writer = createWriter(sinkContext); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java index 964304d5ae7..85062cf215e 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java @@ -28,6 +28,8 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.types.Row; import org.apache.flink.types.RowUtils; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,24 +39,41 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link FileSystemOutputFormat}. */ class FileSystemOutputFormatTest { - @TempDir private java.nio.file.Path tmpPath; @TempDir private java.nio.file.Path outputPath; + @TempDir private java.nio.file.Path stagingBaseDir; + private final TestingFinalizationContext finalizationContext = new TestingFinalizationContext(); + private static final Supplier<List<StreamRecord<Row>>> DEFAULT_INPUT_SUPPLIER = + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + + private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER = + () -> + Collections.singletonList( + createFileContent("a1,1,p1", "a2,2,p1", "a2,2,p2", "a3,3,p1")); + private static Map<File, String> getFileContentByPath(java.nio.file.Path directory) throws IOException { Map<File, String> contents = new HashMap<>(4); @@ -70,6 +89,10 @@ class FileSystemOutputFormatTest { return contents; } + private static String createFileContent(String... rows) { + return Arrays.stream(rows).collect(Collectors.joining("\n", "", "\n")); + } + @BeforeEach void before() { RowUtils.USE_LEGACY_TO_STRING = true; @@ -83,7 +106,7 @@ class FileSystemOutputFormatTest { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness(createSinkFormat(false, false, false, new LinkedHashMap<>()))) { testHarness.setup(); testHarness.open(); } @@ -91,149 +114,181 @@ class FileSystemOutputFormatTest { @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonMap( + "c=p1", createFileContent("a1,1", "a2,2", "a2,2", "a3,3"))); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> + ImmutableMap.of( + "c=p1", + createFileContent("a1,1", "a2,2", "a3,3"), + "c=p2", + createFileContent("a2,2"))); } @Test void testGroupedDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> + ImmutableMap.of( + "c=p1", + createFileContent("a1,1", "a2,2", "a3,3"), + "c=p2", + createFileContent("a2,2"))); + } + + @Test + void testGetUniqueStagingDirectory() throws IOException { + final Path alreadyExistingStagingDir = new Path(outputPath.toFile().getAbsolutePath()); + assertThat(alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir)) + .as("The staging folder should already exist.") + .isTrue(); + + final FileSystemOutputFormat.Builder<Row> builder = + new FileSystemOutputFormat.Builder<Row>() + .setPartitionColumns(new String[0]) + .setFormatFactory(TextOutputFormat::new) + .setMetaStoreFactory( + new FileSystemCommitterTest.TestMetaStoreFactory( + new Path(outputPath.toFile().getAbsolutePath()))) + .setPartitionComputer( + new RowPartitionComputer("default", new String[0], new String[0])) + .setStagingPath(alreadyExistingStagingDir); + + assertThatThrownBy(builder::build) + .as("Reusing a folder should cause an error.") + .isInstanceOf(IllegalStateException.class); + } + + @SuppressWarnings("unchecked") + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<?> outputSupplier) + throws Exception { + Object expectedOutput = outputSupplier.get(); + int expectedFileNum = + (partitioned) + ? ((Map<String, String>) expectedOutput).size() + : ((List<String>) expectedOutput).size(); + FileSystemOutputFormat<Row> outputFormat = + createSinkFormat(override, partitioned, dynamicGrouped, staticPartitions); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(outputFormat)) { testHarness.setup(); testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getFileContentByPath(stagingBaseDir)).hasSize(expectedFileNum); } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + "a3,3\n"); - assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + outputFormat.finalizeGlobal(finalizationContext); + assertThat(stagingBaseDir).isEmptyDirectory(); + + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { + Map<String, String> partitionToContent = + fileToContent.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey().getParentFile().getName(), + Map.Entry::getValue)); + + assertThat(partitionToContent) + .containsExactlyInAnyOrderEntriesOf((Map<String, String>) expectedOutput); + } else { + assertThat(fileToContent.values()) + .containsExactlyInAnyOrderElementsOf((List<String>) expectedOutput); + } } - private OneInputStreamOperatorTestHarness<Row, Object> createSink( + private FileSystemOutputFormat<Row> createSinkFormat( boolean override, boolean partition, boolean dynamicGrouped, - LinkedHashMap<String, String> staticPartitions, - AtomicReference<FileSystemOutputFormat<Row>> sinkRef) - throws Exception { + LinkedHashMap<String, String> staticPartitions) { String[] columnNames = new String[] {"a", "b", "c"}; String[] partitionColumns = partition ? new String[] {"c"} : new String[0]; + Path path = new Path(outputPath.toString()); + TableMetaStoreFactory msFactory = new FileSystemCommitterTest.TestMetaStoreFactory(path); + return new FileSystemOutputFormat.Builder<Row>() + .setMetaStoreFactory(msFactory) + .setPath(new Path(stagingBaseDir.toString())) + .setOverwrite(override) + .setPartitionColumns(partitionColumns) + .setPartitionComputer( + new RowPartitionComputer("default", columnNames, partitionColumns)) + .setFormatFactory(TextOutputFormat::new) + .setDynamicGrouped(dynamicGrouped) + .setStaticPartitions(staticPartitions) + .build(); + } - TableMetaStoreFactory msFactory = - new FileSystemCommitterTest.TestMetaStoreFactory(new Path(outputPath.toString())); - FileSystemOutputFormat<Row> sink = - new FileSystemOutputFormat.Builder<Row>() - .setMetaStoreFactory(msFactory) - .setTempPath(new Path(tmpPath.toString())) - .setOverwrite(override) - .setPartitionColumns(partitionColumns) - .setPartitionComputer( - new RowPartitionComputer("default", columnNames, partitionColumns)) - .setFormatFactory(TextOutputFormat::new) - .setDynamicGrouped(dynamicGrouped) - .setStaticPartitions(staticPartitions) - .build(); - - sinkRef.set(sink); - + private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness( + FileSystemOutputFormat<Row> outputFormat) throws Exception { return new OneInputStreamOperatorTestHarness<>( - new StreamSink<>(new OutputFormatSinkFunction<>(sink)), + new StreamSink<>(new OutputFormatSinkFunction<>(outputFormat)), // test parallelism 3, 3, diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 779de538f43..92fed884c5b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -607,8 +607,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su builder.setOverwrite(overwrite); builder.setIsToLocal(isToLocal); builder.setStaticPartitions(staticPartitionSpec); - builder.setTempPath( - new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf))); + builder.setPath(new org.apache.flink.core.fs.Path(stagingParentDir)); builder.setOutputFileConfig(fileNaming); builder.setIdentifier(identifier); builder.setPartitionCommitPolicyFactory(