This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fa4890e0f9 Flink: Backport Prevent recreation of 
ManifestOutputFileFactory during flushing (#14385)
fa4890e0f9 is described below

commit fa4890e0f923ae1f9fbf0065d2430e21e77174f3
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Oct 21 12:02:19 2025 +0200

    Flink: Backport Prevent recreation of ManifestOutputFileFactory during 
flushing (#14385)
    
    backports #14358
---
 flink/v1.20/build.gradle                           |   3 -
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  14 ++-
 .../flink/sink/ManifestOutputFileFactory.java      |  11 +-
 .../flink/sink/dynamic/DynamicIcebergSink.java     |   2 +-
 .../sink/dynamic/DynamicWriteResultAggregator.java |  34 ++++---
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |   5 +
 .../iceberg/flink/sink/TestFlinkManifest.java      |   2 +-
 .../flink/sink/TestManifestOutputFileFactory.java  | 112 +++++++++++++++++++++
 .../flink/sink/dynamic/TestDynamicCommitter.java   |  12 ++-
 .../dynamic/TestDynamicWriteResultAggregator.java  |  92 ++++++++++++++++-
 .../flink/sink/dynamic/TestDynamicWriter.java      |  32 ++++++
 flink/v2.0/build.gradle                            |   3 -
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  14 ++-
 .../flink/sink/ManifestOutputFileFactory.java      |  11 +-
 .../flink/sink/dynamic/DynamicIcebergSink.java     |   2 +-
 .../sink/dynamic/DynamicWriteResultAggregator.java |  34 ++++---
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |   5 +
 .../iceberg/flink/sink/TestFlinkManifest.java      |   2 +-
 .../flink/sink/TestManifestOutputFileFactory.java  | 112 +++++++++++++++++++++
 .../flink/sink/dynamic/TestDynamicCommitter.java   |  12 ++-
 .../dynamic/TestDynamicWriteResultAggregator.java  |  92 ++++++++++++++++-
 .../flink/sink/dynamic/TestDynamicWriter.java      |  32 ++++++
 22 files changed, 580 insertions(+), 58 deletions(-)

diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index d0bbe7a898..3591bf37b1 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -68,9 +68,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
 
     implementation libs.datasketches
 
-    // for caching in DynamicSink
-    implementation libs.caffeine
-
     testImplementation libs.flink120.connector.test.utils
     testImplementation libs.flink120.core
     testImplementation libs.flink120.runtime
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 13affd8484..0eeedf2659 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -74,7 +74,19 @@ public class FlinkManifestUtil {
       int subTaskId,
       long attemptNumber) {
     return new ManifestOutputFileFactory(
-        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber);
+        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, null);
+  }
+
+  public static ManifestOutputFileFactory createOutputFileFactory(
+      Supplier<Table> tableSupplier,
+      Map<String, String> tableProps,
+      String flinkJobId,
+      String operatorUniqueId,
+      int subTaskId,
+      long attemptNumber,
+      String suffix) {
+    return new ManifestOutputFileFactory(
+        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, suffix);
   }
 
   /**
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index 6ba87bea30..30c0e11a25 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -22,6 +22,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
@@ -43,6 +44,7 @@ public class ManifestOutputFileFactory {
   private final int subTaskId;
   private final long attemptNumber;
   private final AtomicInteger fileCount = new AtomicInteger(0);
+  @Nullable private final String suffix;
 
   ManifestOutputFileFactory(
       Supplier<Table> tableSupplier,
@@ -50,26 +52,29 @@ public class ManifestOutputFileFactory {
       String flinkJobId,
       String operatorUniqueId,
       int subTaskId,
-      long attemptNumber) {
+      long attemptNumber,
+      @Nullable String suffix) {
     this.tableSupplier = tableSupplier;
     this.props = props;
     this.flinkJobId = flinkJobId;
     this.operatorUniqueId = operatorUniqueId;
     this.subTaskId = subTaskId;
     this.attemptNumber = attemptNumber;
+    this.suffix = suffix;
   }
 
   private String generatePath(long checkpointId) {
     return FileFormat.AVRO.addExtension(
         String.format(
             Locale.ROOT,
-            "%s-%s-%05d-%d-%d-%05d",
+            "%s-%s-%05d-%d-%d-%05d%s",
             flinkJobId,
             operatorUniqueId,
             subTaskId,
             attemptNumber,
             checkpointId,
-            fileCount.incrementAndGet()));
+            fileCount.incrementAndGet(),
+            suffix != null ? "-" + suffix : ""));
   }
 
   public OutputFile create(long checkpointId) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 9547de78d6..e90fe4d6b1 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -168,7 +168,7 @@ public class DynamicIcebergSink
         .transform(
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
-            new DynamicWriteResultAggregator(catalogLoader))
+            new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
         .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
   }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 58ba183dfc..b92d32fcc4 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -18,12 +18,10 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -58,20 +56,21 @@ class DynamicWriteResultAggregator
         CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
   private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
-  private static final Duration CACHE_EXPIRATION_DURATION = 
Duration.ofMinutes(1);
 
   private final CatalogLoader catalogLoader;
+  private final int cacheMaximumSize;
   private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
-  private transient Cache<String, Map<Integer, PartitionSpec>> specs;
-  private transient Cache<String, ManifestOutputFileFactory> 
outputFileFactories;
+  private transient Map<String, Map<Integer, PartitionSpec>> specs;
+  private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
   private transient String flinkJobId;
   private transient String operatorId;
   private transient int subTaskId;
   private transient int attemptId;
   private transient Catalog catalog;
 
-  DynamicWriteResultAggregator(CatalogLoader catalogLoader) {
+  DynamicWriteResultAggregator(CatalogLoader catalogLoader, int 
cacheMaximumSize) {
     this.catalogLoader = catalogLoader;
+    this.cacheMaximumSize = cacheMaximumSize;
   }
 
   @Override
@@ -81,10 +80,8 @@ class DynamicWriteResultAggregator
     this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
     this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
     this.results = Maps.newHashMap();
-    this.specs =
-        
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
-    this.outputFileFactories =
-        
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
+    this.specs = new LRUCache<>(cacheMaximumSize);
+    this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
     this.catalog = catalogLoader.loadCatalog();
   }
 
@@ -163,18 +160,27 @@ class DynamicWriteResultAggregator
   }
 
   private ManifestOutputFileFactory outputFileFactory(String tableName) {
-    return outputFileFactories.get(
+    return outputFileFactories.computeIfAbsent(
         tableName,
         unused -> {
           Table table = catalog.loadTable(TableIdentifier.parse(tableName));
           specs.put(tableName, table.specs());
+          // Make sure to append an identifier to avoid file clashes in case 
the factory was to get
+          // re-created during a checkpoint, i.e. due to cache eviction.
+          String fileSuffix = UUID.randomUUID().toString();
           return FlinkManifestUtil.createOutputFileFactory(
-              () -> table, table.properties(), flinkJobId, operatorId, 
subTaskId, attemptId);
+              () -> table,
+              table.properties(),
+              flinkJobId,
+              operatorId,
+              subTaskId,
+              attemptId,
+              fileSuffix);
         });
   }
 
   private PartitionSpec spec(String tableName, int specId) {
-    Map<Integer, PartitionSpec> knownSpecs = specs.getIfPresent(tableName);
+    Map<Integer, PartitionSpec> knownSpecs = specs.get(tableName);
     if (knownSpecs != null) {
       PartitionSpec spec = knownSpecs.get(specId);
       if (spec != null) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index ae24efafa6..e99e6e72da 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -210,4 +210,9 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
   DynamicWriterMetrics getMetrics() {
     return metrics;
   }
+
+  @VisibleForTesting
+  Map<WriteTarget, RowDataTaskWriterFactory> getTaskWriterFactories() {
+    return taskWriterFactories;
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 10197ddfaf..c6dc984513 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -134,7 +134,7 @@ public class TestFlinkManifest {
             ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION,
             userProvidedFolder.getAbsolutePath() + "///");
     ManifestOutputFileFactory factory =
-        new ManifestOutputFileFactory(() -> table, props, flinkJobId, 
operatorId, 1, 1);
+        new ManifestOutputFileFactory(() -> table, props, flinkJobId, 
operatorId, 1, 1, null);
 
     List<DataFile> dataFiles = generateDataFiles(5);
     DeltaManifests deltaManifests =
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
new file mode 100644
index 0000000000..654248fcab
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestManifestOutputFileFactory {
+
+  @RegisterExtension
+  static final HadoopCatalogExtension CATALOG_EXTENSION = new 
HadoopCatalogExtension("db", "table");
+
+  private Table table;
+
+  @BeforeEach
+  void before() throws IOException {
+    table = 
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
+  }
+
+  @Test
+  public void testFileNameFormat() {
+    String flinkJobId = "job123";
+    String operatorUniqueId = "operator456";
+    int subTaskId = 7;
+    long attemptNumber = 2;
+    long checkpointId = 100;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, null);
+
+    String file1 = new File(factory.create(checkpointId).location()).getName();
+    assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001.avro");
+
+    String file2 = new File(factory.create(checkpointId).location()).getName();
+    assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002.avro");
+
+    String file3 = new File(factory.create(checkpointId + 
1).location()).getName();
+    assertThat(file3).isEqualTo("job123-operator456-00007-2-101-00003.avro");
+  }
+
+  @Test
+  public void testFileNameFormatWithSuffix() {
+    String flinkJobId = "job123";
+    String operatorUniqueId = "operator456";
+    int subTaskId = 7;
+    long attemptNumber = 2;
+    long checkpointId = 100;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix");
+
+    String file1 = new File(factory.create(checkpointId).location()).getName();
+    
assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001-suffix.avro");
+
+    String file2 = new File(factory.create(checkpointId).location()).getName();
+    
assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002-suffix.avro");
+  }
+
+  @Test
+  public void testSuffixedFileNamesWithRecreatedFactory() {
+    String flinkJobId = "test-job";
+    String operatorUniqueId = "test-operator";
+    int subTaskId = 0;
+    long attemptNumber = 1;
+    long checkpointId = 1;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory1 =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix1");
+    String file1 = new 
File(factory1.create(checkpointId).location()).getName();
+    
assertThat(file1).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix1.avro");
+
+    ManifestOutputFileFactory factory2 =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix2");
+    String file2 = new 
File(factory2.create(checkpointId).location()).getName();
+    
assertThat(file2).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix2.avro");
+
+    assertThat(file1).isNotEqualTo(file2);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index f5387aee88..7894428a78 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -68,6 +68,8 @@ class TestDynamicCommitter {
 
   Catalog catalog;
 
+  final int cacheMaximumSize = 10;
+
   private static final DataFile DATA_FILE =
       DataFiles.builder(PartitionSpec.unpartitioned())
           .withPath("/path/to/data-1.parquet")
@@ -155,7 +157,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -281,7 +283,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -339,7 +341,7 @@ class TestDynamicCommitter {
     assertThat(table.snapshots()).isEmpty();
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -454,7 +456,7 @@ class TestDynamicCommitter {
     assertThat(table.snapshots()).isEmpty();
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -613,7 +615,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
index 713c67da17..d3f4385d97 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
@@ -20,15 +20,26 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.DeltaManifests;
+import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -37,13 +48,22 @@ class TestDynamicWriteResultAggregator {
   @RegisterExtension
   static final HadoopCatalogExtension CATALOG_EXTENSION = new 
HadoopCatalogExtension("db", "table");
 
+  final int cacheMaximumSize = 1;
+
+  private static final DataFile DATA_FILE =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-1.parquet")
+          .withFileSizeInBytes(100)
+          .withRecordCount(1)
+          .build();
+
   @Test
   void testAggregator() throws Exception {
     CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
     CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new 
Schema());
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     try (OneInputStreamOperatorTestHarness<
             CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>>
         testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
@@ -79,4 +99,74 @@ class TestDynamicWriteResultAggregator {
       assertThat(testHarness.getRecordOutput()).hasSize(4);
     }
   }
+
+  @Test
+  void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception 
{
+    CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
+
+    // Disable caching of ManifestOutputFileFactory.
+    final int zeroCacheSize = 0;
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
zeroCacheSize);
+    try (OneInputStreamOperatorTestHarness<
+            CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>>
+        testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
+      testHarness.open();
+
+      WriteTarget writeTarget1 =
+          new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet());
+      DynamicWriteResult dynamicWriteResult1 =
+          new DynamicWriteResult(
+              writeTarget1, 
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+      // Different WriteTarget
+      WriteTarget writeTarget2 =
+          new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet());
+      DynamicWriteResult dynamicWriteResult2 =
+          new DynamicWriteResult(
+              writeTarget2, 
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+      CommittableWithLineage<DynamicWriteResult> committable1 =
+          new CommittableWithLineage<>(dynamicWriteResult1, 0, 0);
+      testHarness.processElement(new StreamRecord<>(committable1));
+
+      CommittableWithLineage<DynamicWriteResult> committable2 =
+          new CommittableWithLineage<>(dynamicWriteResult2, 0, 0);
+      testHarness.processElement(new StreamRecord<>(committable2));
+
+      assertThat(testHarness.getOutput()).isEmpty();
+
+      testHarness.prepareSnapshotPreBarrier(1L);
+      List<StreamRecord<CommittableMessage<DynamicCommittable>>> output =
+          Lists.newArrayList(testHarness.getRecordOutput().iterator());
+
+      assertThat(testHarness.getOutput()).hasSize(3);
+      
assertThat(output.get(0).getValue()).isInstanceOf(CommittableSummary.class);
+      
assertThat(output.get(1).getValue()).isInstanceOf(CommittableWithLineage.class);
+      
assertThat(output.get(2).getValue()).isInstanceOf(CommittableWithLineage.class);
+
+      // There should be two unique file paths, despite the cache being 
disabled.
+      Set<String> manifestPaths = getManifestPaths(output);
+      assertThat(manifestPaths).hasSize(2);
+    }
+  }
+
+  private static Set<String> getManifestPaths(
+      List<StreamRecord<CommittableMessage<DynamicCommittable>>> messages) 
throws IOException {
+    Set<String> manifestPaths = Sets.newHashSet();
+
+    for (StreamRecord<CommittableMessage<DynamicCommittable>> record : 
messages) {
+      CommittableMessage<DynamicCommittable> message = record.getValue();
+      if (message instanceof CommittableWithLineage) {
+        DeltaManifests deltaManifests =
+            SimpleVersionedSerialization.readVersionAndDeSerialize(
+                DeltaManifestsSerializer.INSTANCE,
+                (((CommittableWithLineage<DynamicCommittable>) 
message).getCommittable())
+                    .manifest());
+        deltaManifests.manifests().forEach(manifest -> 
manifestPaths.add(manifest.path()));
+      }
+    }
+
+    return manifestPaths;
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 89dd4f2259..689fd20483 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -202,6 +202,38 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
             "Equality field columns shouldn't be empty when configuring to use 
UPSERT data.");
   }
 
+  @Test
+  void testUniqueFileSuffixOnFactoryRecreation() throws Exception {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+    DynamicWriter dynamicWriter = createDynamicWriter(catalog);
+    DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    File dataDir1 = new File(URI.create(table1.location()).getPath(), "data");
+    File[] files = dataDir1.listFiles((dir, name) -> !name.startsWith("."));
+    assertThat(files).isNotNull().hasSize(1);
+    File firstFile = files[0];
+
+    // Clear cache which must create new unique files names for the output 
files
+    dynamicWriter.getTaskWriterFactories().clear();
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    files =
+        dataDir1.listFiles(
+            (dir, name) -> !name.startsWith(".") && 
!name.equals(firstFile.getName()));
+    assertThat(files).isNotNull().hasSize(1);
+    File secondFile = files[0];
+
+    // File names must be different
+    assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName());
+  }
+
   private static @Nonnull DynamicWriter createDynamicWriter(
       Catalog catalog, Map<String, String> properties) {
     DynamicWriter dynamicWriter =
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index 01fbc1a4fe..5907f41b35 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -68,9 +68,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
 
     implementation libs.datasketches
 
-    // for caching in DynamicSink
-    implementation libs.caffeine
-
     testImplementation libs.flink20.connector.test.utils
     testImplementation libs.flink20.core
     testImplementation libs.flink20.runtime
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 13affd8484..0eeedf2659 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -74,7 +74,19 @@ public class FlinkManifestUtil {
       int subTaskId,
       long attemptNumber) {
     return new ManifestOutputFileFactory(
-        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber);
+        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, null);
+  }
+
+  public static ManifestOutputFileFactory createOutputFileFactory(
+      Supplier<Table> tableSupplier,
+      Map<String, String> tableProps,
+      String flinkJobId,
+      String operatorUniqueId,
+      int subTaskId,
+      long attemptNumber,
+      String suffix) {
+    return new ManifestOutputFileFactory(
+        tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, suffix);
   }
 
   /**
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index 6ba87bea30..30c0e11a25 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -22,6 +22,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
@@ -43,6 +44,7 @@ public class ManifestOutputFileFactory {
   private final int subTaskId;
   private final long attemptNumber;
   private final AtomicInteger fileCount = new AtomicInteger(0);
+  @Nullable private final String suffix;
 
   ManifestOutputFileFactory(
       Supplier<Table> tableSupplier,
@@ -50,26 +52,29 @@ public class ManifestOutputFileFactory {
       String flinkJobId,
       String operatorUniqueId,
       int subTaskId,
-      long attemptNumber) {
+      long attemptNumber,
+      @Nullable String suffix) {
     this.tableSupplier = tableSupplier;
     this.props = props;
     this.flinkJobId = flinkJobId;
     this.operatorUniqueId = operatorUniqueId;
     this.subTaskId = subTaskId;
     this.attemptNumber = attemptNumber;
+    this.suffix = suffix;
   }
 
   private String generatePath(long checkpointId) {
     return FileFormat.AVRO.addExtension(
         String.format(
             Locale.ROOT,
-            "%s-%s-%05d-%d-%d-%05d",
+            "%s-%s-%05d-%d-%d-%05d%s",
             flinkJobId,
             operatorUniqueId,
             subTaskId,
             attemptNumber,
             checkpointId,
-            fileCount.incrementAndGet()));
+            fileCount.incrementAndGet(),
+            suffix != null ? "-" + suffix : ""));
   }
 
   public OutputFile create(long checkpointId) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 2715a01608..db48be7977 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -168,7 +168,7 @@ public class DynamicIcebergSink
         .transform(
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
-            new DynamicWriteResultAggregator(catalogLoader))
+            new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
         .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
   }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 58ba183dfc..b92d32fcc4 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -18,12 +18,10 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -58,20 +56,21 @@ class DynamicWriteResultAggregator
         CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
   private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
-  private static final Duration CACHE_EXPIRATION_DURATION = 
Duration.ofMinutes(1);
 
   private final CatalogLoader catalogLoader;
+  private final int cacheMaximumSize;
   private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
-  private transient Cache<String, Map<Integer, PartitionSpec>> specs;
-  private transient Cache<String, ManifestOutputFileFactory> 
outputFileFactories;
+  private transient Map<String, Map<Integer, PartitionSpec>> specs;
+  private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
   private transient String flinkJobId;
   private transient String operatorId;
   private transient int subTaskId;
   private transient int attemptId;
   private transient Catalog catalog;
 
-  DynamicWriteResultAggregator(CatalogLoader catalogLoader) {
+  DynamicWriteResultAggregator(CatalogLoader catalogLoader, int 
cacheMaximumSize) {
     this.catalogLoader = catalogLoader;
+    this.cacheMaximumSize = cacheMaximumSize;
   }
 
   @Override
@@ -81,10 +80,8 @@ class DynamicWriteResultAggregator
     this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
     this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
     this.results = Maps.newHashMap();
-    this.specs =
-        
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
-    this.outputFileFactories =
-        
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
+    this.specs = new LRUCache<>(cacheMaximumSize);
+    this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
     this.catalog = catalogLoader.loadCatalog();
   }
 
@@ -163,18 +160,27 @@ class DynamicWriteResultAggregator
   }
 
   private ManifestOutputFileFactory outputFileFactory(String tableName) {
-    return outputFileFactories.get(
+    return outputFileFactories.computeIfAbsent(
         tableName,
         unused -> {
           Table table = catalog.loadTable(TableIdentifier.parse(tableName));
           specs.put(tableName, table.specs());
+          // Make sure to append an identifier to avoid file clashes in case 
the factory was to get
+          // re-created during a checkpoint, i.e. due to cache eviction.
+          String fileSuffix = UUID.randomUUID().toString();
           return FlinkManifestUtil.createOutputFileFactory(
-              () -> table, table.properties(), flinkJobId, operatorId, 
subTaskId, attemptId);
+              () -> table,
+              table.properties(),
+              flinkJobId,
+              operatorId,
+              subTaskId,
+              attemptId,
+              fileSuffix);
         });
   }
 
   private PartitionSpec spec(String tableName, int specId) {
-    Map<Integer, PartitionSpec> knownSpecs = specs.getIfPresent(tableName);
+    Map<Integer, PartitionSpec> knownSpecs = specs.get(tableName);
     if (knownSpecs != null) {
       PartitionSpec spec = knownSpecs.get(specId);
       if (spec != null) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index ae24efafa6..e99e6e72da 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -210,4 +210,9 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
   DynamicWriterMetrics getMetrics() {
     return metrics;
   }
+
+  @VisibleForTesting
+  Map<WriteTarget, RowDataTaskWriterFactory> getTaskWriterFactories() {
+    return taskWriterFactories;
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 10197ddfaf..c6dc984513 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -134,7 +134,7 @@ public class TestFlinkManifest {
             ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION,
             userProvidedFolder.getAbsolutePath() + "///");
     ManifestOutputFileFactory factory =
-        new ManifestOutputFileFactory(() -> table, props, flinkJobId, 
operatorId, 1, 1);
+        new ManifestOutputFileFactory(() -> table, props, flinkJobId, 
operatorId, 1, 1, null);
 
     List<DataFile> dataFiles = generateDataFiles(5);
     DeltaManifests deltaManifests =
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
new file mode 100644
index 0000000000..654248fcab
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestManifestOutputFileFactory {
+
+  @RegisterExtension
+  static final HadoopCatalogExtension CATALOG_EXTENSION = new 
HadoopCatalogExtension("db", "table");
+
+  private Table table;
+
+  @BeforeEach
+  void before() throws IOException {
+    table = 
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
+  }
+
+  @Test
+  public void testFileNameFormat() {
+    String flinkJobId = "job123";
+    String operatorUniqueId = "operator456";
+    int subTaskId = 7;
+    long attemptNumber = 2;
+    long checkpointId = 100;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, null);
+
+    String file1 = new File(factory.create(checkpointId).location()).getName();
+    assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001.avro");
+
+    String file2 = new File(factory.create(checkpointId).location()).getName();
+    assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002.avro");
+
+    String file3 = new File(factory.create(checkpointId + 
1).location()).getName();
+    assertThat(file3).isEqualTo("job123-operator456-00007-2-101-00003.avro");
+  }
+
+  @Test
+  public void testFileNameFormatWithSuffix() {
+    String flinkJobId = "job123";
+    String operatorUniqueId = "operator456";
+    int subTaskId = 7;
+    long attemptNumber = 2;
+    long checkpointId = 100;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix");
+
+    String file1 = new File(factory.create(checkpointId).location()).getName();
+    
assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001-suffix.avro");
+
+    String file2 = new File(factory.create(checkpointId).location()).getName();
+    
assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002-suffix.avro");
+  }
+
+  @Test
+  public void testSuffixedFileNamesWithRecreatedFactory() {
+    String flinkJobId = "test-job";
+    String operatorUniqueId = "test-operator";
+    int subTaskId = 0;
+    long attemptNumber = 1;
+    long checkpointId = 1;
+    Map<String, String> props = table.properties();
+
+    ManifestOutputFileFactory factory1 =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix1");
+    String file1 = new 
File(factory1.create(checkpointId).location()).getName();
+    
assertThat(file1).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix1.avro");
+
+    ManifestOutputFileFactory factory2 =
+        new ManifestOutputFileFactory(
+            () -> table, props, flinkJobId, operatorUniqueId, subTaskId, 
attemptNumber, "suffix2");
+    String file2 = new 
File(factory2.create(checkpointId).location()).getName();
+    
assertThat(file2).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix2.avro");
+
+    assertThat(file1).isNotEqualTo(file2);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index f5387aee88..7894428a78 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -68,6 +68,8 @@ class TestDynamicCommitter {
 
   Catalog catalog;
 
+  final int cacheMaximumSize = 10;
+
   private static final DataFile DATA_FILE =
       DataFiles.builder(PartitionSpec.unpartitioned())
           .withPath("/path/to/data-1.parquet")
@@ -155,7 +157,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -281,7 +283,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -339,7 +341,7 @@ class TestDynamicCommitter {
     assertThat(table.snapshots()).isEmpty();
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -454,7 +456,7 @@ class TestDynamicCommitter {
     assertThat(table.snapshots()).isEmpty();
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
@@ -613,7 +615,7 @@ class TestDynamicCommitter {
         new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     OneInputStreamOperatorTestHarness aggregatorHarness =
         new OneInputStreamOperatorTestHarness(aggregator);
     aggregatorHarness.open();
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
index 713c67da17..d3f4385d97 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
@@ -20,15 +20,26 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.DeltaManifests;
+import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -37,13 +48,22 @@ class TestDynamicWriteResultAggregator {
   @RegisterExtension
   static final HadoopCatalogExtension CATALOG_EXTENSION = new 
HadoopCatalogExtension("db", "table");
 
+  final int cacheMaximumSize = 1;
+
+  private static final DataFile DATA_FILE =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-1.parquet")
+          .withFileSizeInBytes(100)
+          .withRecordCount(1)
+          .build();
+
   @Test
   void testAggregator() throws Exception {
     CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
     CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new 
Schema());
 
     DynamicWriteResultAggregator aggregator =
-        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
     try (OneInputStreamOperatorTestHarness<
             CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>>
         testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
@@ -79,4 +99,74 @@ class TestDynamicWriteResultAggregator {
       assertThat(testHarness.getRecordOutput()).hasSize(4);
     }
   }
+
+  @Test
+  void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception 
{
+    CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new 
Schema());
+
+    // Disable caching of ManifestOutputFileFactory.
+    final int zeroCacheSize = 0;
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
zeroCacheSize);
+    try (OneInputStreamOperatorTestHarness<
+            CommittableMessage<DynamicWriteResult>, 
CommittableMessage<DynamicCommittable>>
+        testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
+      testHarness.open();
+
+      WriteTarget writeTarget1 =
+          new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet());
+      DynamicWriteResult dynamicWriteResult1 =
+          new DynamicWriteResult(
+              writeTarget1, 
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+      // Different WriteTarget
+      WriteTarget writeTarget2 =
+          new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet());
+      DynamicWriteResult dynamicWriteResult2 =
+          new DynamicWriteResult(
+              writeTarget2, 
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+      CommittableWithLineage<DynamicWriteResult> committable1 =
+          new CommittableWithLineage<>(dynamicWriteResult1, 0, 0);
+      testHarness.processElement(new StreamRecord<>(committable1));
+
+      CommittableWithLineage<DynamicWriteResult> committable2 =
+          new CommittableWithLineage<>(dynamicWriteResult2, 0, 0);
+      testHarness.processElement(new StreamRecord<>(committable2));
+
+      assertThat(testHarness.getOutput()).isEmpty();
+
+      testHarness.prepareSnapshotPreBarrier(1L);
+      List<StreamRecord<CommittableMessage<DynamicCommittable>>> output =
+          Lists.newArrayList(testHarness.getRecordOutput().iterator());
+
+      assertThat(testHarness.getOutput()).hasSize(3);
+      
assertThat(output.get(0).getValue()).isInstanceOf(CommittableSummary.class);
+      
assertThat(output.get(1).getValue()).isInstanceOf(CommittableWithLineage.class);
+      
assertThat(output.get(2).getValue()).isInstanceOf(CommittableWithLineage.class);
+
+      // There should be two unique file paths, despite the cache being 
disabled.
+      Set<String> manifestPaths = getManifestPaths(output);
+      assertThat(manifestPaths).hasSize(2);
+    }
+  }
+
+  private static Set<String> getManifestPaths(
+      List<StreamRecord<CommittableMessage<DynamicCommittable>>> messages) 
throws IOException {
+    Set<String> manifestPaths = Sets.newHashSet();
+
+    for (StreamRecord<CommittableMessage<DynamicCommittable>> record : 
messages) {
+      CommittableMessage<DynamicCommittable> message = record.getValue();
+      if (message instanceof CommittableWithLineage) {
+        DeltaManifests deltaManifests =
+            SimpleVersionedSerialization.readVersionAndDeSerialize(
+                DeltaManifestsSerializer.INSTANCE,
+                (((CommittableWithLineage<DynamicCommittable>) 
message).getCommittable())
+                    .manifest());
+        deltaManifests.manifests().forEach(manifest -> 
manifestPaths.add(manifest.path()));
+      }
+    }
+
+    return manifestPaths;
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 89dd4f2259..689fd20483 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -202,6 +202,38 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
             "Equality field columns shouldn't be empty when configuring to use 
UPSERT data.");
   }
 
+  @Test
+  void testUniqueFileSuffixOnFactoryRecreation() throws Exception {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+    DynamicWriter dynamicWriter = createDynamicWriter(catalog);
+    DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    File dataDir1 = new File(URI.create(table1.location()).getPath(), "data");
+    File[] files = dataDir1.listFiles((dir, name) -> !name.startsWith("."));
+    assertThat(files).isNotNull().hasSize(1);
+    File firstFile = files[0];
+
+    // Clear cache which must create new unique files names for the output 
files
+    dynamicWriter.getTaskWriterFactories().clear();
+
+    dynamicWriter.write(record1, null);
+    dynamicWriter.prepareCommit();
+
+    files =
+        dataDir1.listFiles(
+            (dir, name) -> !name.startsWith(".") && 
!name.equals(firstFile.getName()));
+    assertThat(files).isNotNull().hasSize(1);
+    File secondFile = files[0];
+
+    // File names must be different
+    assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName());
+  }
+
   private static @Nonnull DynamicWriter createDynamicWriter(
       Catalog catalog, Map<String, String> properties) {
     DynamicWriter dynamicWriter =

Reply via email to