This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dcb1047c8 [flink] Introduce 'partition.mark-done.recover-from-state' 
(#5648)
3dcb1047c8 is described below

commit 3dcb1047c835f896662ee06e1eb3edceda8f98a2
Author: wangwj <[email protected]>
AuthorDate: Thu May 22 18:00:24 2025 +0800

    [flink] Introduce 'partition.mark-done.recover-from-state' (#5648)
---
 .../generated/flink_connector_configuration.html   |  6 +++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  7 +++
 .../flink/sink/CombinedTableCompactorSink.java     |  4 +-
 .../org/apache/paimon/flink/sink/Committer.java    |  7 +--
 .../paimon/flink/sink/CommitterOperator.java       |  2 +-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   |  8 +++-
 .../RestoreAndFailCommittableStateManager.java     | 12 ++++-
 .../apache/paimon/flink/sink/StoreCommitter.java   |  4 +-
 .../paimon/flink/sink/StoreMultiCommitter.java     |  7 ++-
 .../flink/sink/partition/PartitionListener.java    |  3 +-
 .../flink/sink/partition/PartitionListeners.java   |  6 +--
 .../flink/sink/partition/PartitionMarkDone.java    | 15 +++---
 .../sink/partition/ReportPartStatsListener.java    |  2 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   | 54 +++++++++++++++++++++-
 .../sink/partition/PartitionMarkDoneTest.java      | 28 ++++++-----
 15 files changed, 125 insertions(+), 40 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f348c35e8e..53f6884279 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -98,6 +98,12 @@ under the License.
             <td><p>Enum</p></td>
             <td>How to trigger partition mark done action.<br /><br />Possible 
values:<ul><li>"process-time": Based on the time of the machine, mark the 
partition done once the processing time passes period time plus 
delay.</li><li>"watermark": Based on the watermark of the input, mark the 
partition done once the watermark passes period time plus delay.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>partition.mark-done.recover-from-state</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether trigger partition mark done when recover from 
state.</td>
+        </tr>
         <tr>
             <td><h5>partition.time-interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a9e7f0f7d1..a0ff463356 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -378,6 +378,13 @@ public class FlinkConnectorOptions {
                             "You can specify time interval for partition, for 
example, "
                                     + "daily partition is '1 d', hourly 
partition is '1 h'.");
 
+    public static final ConfigOption<Boolean> 
PARTITION_MARK_DONE_RECOVER_FROM_STATE =
+            key("partition.mark-done.recover-from-state")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether trigger partition mark done when recover 
from state.");
+
     public static final ConfigOption<String> CLUSTERING_COLUMNS =
             key("sink.clustering.by-columns")
                     .stringType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 3d40bb6f49..7f3366d1b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -41,6 +41,7 @@ import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.createCommitUser;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
@@ -203,6 +204,7 @@ public class CombinedTableCompactorSink implements 
Serializable {
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
         return new RestoreAndFailCommittableStateManager<>(
-                WrappedManifestCommittableSerializer::new);
+                WrappedManifestCommittableSerializer::new,
+                options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 4679318b7e..31acb1e91d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -56,14 +56,9 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
     int filterAndCommit(
             List<GlobalCommitT> globalCommittables,
             boolean checkAppendFiles,
-            boolean recoveryFromState)
+            boolean partitionMarkDoneRecoverFromState)
             throws IOException;
 
-    default int filterAndCommitFromState(List<GlobalCommitT> 
globalCommittables)
-            throws IOException {
-        return filterAndCommit(globalCommittables, true, true);
-    }
-
     Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT> 
committables);
 
     /** Factory to create {@link Committer}. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index b5c73f17f9..64196585c1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -210,7 +210,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
             // So when `endInput` is called, we must check if the 
corresponding snapshot exists.
             // However, if the snapshot does not exist, then append files must 
be new files. So
             // there is no need to check for duplicated append files.
-            committer.filterAndCommit(committables, false, false);
+            committer.filterAndCommit(committables, false, true);
         } else {
             committer.commit(committables);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index f36dae4a83..74f1febd90 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
 import javax.annotation.Nullable;
 
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE;
+
 /** A {@link FlinkSink} to write records. */
 public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
 
@@ -55,6 +58,9 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
 
     @Override
     protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
-        return new 
RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new);
+        Options options = table.coreOptions().toConfiguration();
+        return new RestoreAndFailCommittableStateManager<>(
+                ManifestCommittableSerializer::new,
+                options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index dda1abf90d..a9b0922d0b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -51,12 +51,21 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
     /** The committable's serializer. */
     private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer;
 
+    private final boolean partitionMarkDoneRecoverFromState;
+
     /** GlobalCommitT state of this job. Used to filter out previous 
successful commits. */
     private ListState<GlobalCommitT> streamingCommitterState;
 
     public RestoreAndFailCommittableStateManager(
             SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer) {
+        this(committableSerializer, true);
+    }
+
+    public RestoreAndFailCommittableStateManager(
+            SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer,
+            boolean partitionMarkDoneRecoverFromState) {
         this.committableSerializer = committableSerializer;
+        this.partitionMarkDoneRecoverFromState = 
partitionMarkDoneRecoverFromState;
     }
 
     @Override
@@ -79,7 +88,8 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
 
     private void recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
             throws Exception {
-        int numCommitted = committer.filterAndCommitFromState(committables);
+        int numCommitted =
+                committer.filterAndCommit(committables, true, 
partitionMarkDoneRecoverFromState);
         if (numCommitted > 0) {
             throw new RuntimeException(
                     "This exception is intentionally thrown "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index d3d6f206cc..3b5d5bf627 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -117,9 +117,9 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     public int filterAndCommit(
             List<ManifestCommittable> globalCommittables,
             boolean checkAppendFiles,
-            boolean recoveryFromState) {
+            boolean partitionMarkDoneRecoverFromState) {
         int committed = commit.filterAndCommitMultiple(globalCommittables, 
checkAppendFiles);
-        partitionListeners.notifyCommittable(globalCommittables, 
recoveryFromState);
+        partitionListeners.notifyCommittable(globalCommittables, 
partitionMarkDoneRecoverFromState);
 
         return committed;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 886378ef7d..77aafe0baf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -180,13 +180,16 @@ public class StoreMultiCommitter
     public int filterAndCommit(
             List<WrappedManifestCommittable> globalCommittables,
             boolean checkAppendFiles,
-            boolean recoveryFromState) {
+            boolean partitionMarkDoneRecoverFromState) {
         int result = 0;
         for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
                 groupByTable(globalCommittables).entrySet()) {
             result +=
                     getStoreCommitter(entry.getKey())
-                            .filterAndCommit(entry.getValue(), 
checkAppendFiles, recoveryFromState);
+                            .filterAndCommit(
+                                    entry.getValue(),
+                                    checkAppendFiles,
+                                    partitionMarkDoneRecoverFromState);
         }
         return result;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
index f01df4b870..18c365081a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
@@ -26,7 +26,8 @@ import java.util.List;
 /** The partition listener. */
 public interface PartitionListener extends Closeable {
 
-    void notifyCommittable(List<ManifestCommittable> committables, boolean 
recoverFromState);
+    void notifyCommittable(
+            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState);
 
     void snapshotState() throws Exception;
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index 4e14e772a7..8f8d633d43 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -39,14 +39,14 @@ public class PartitionListeners implements Closeable {
 
     public void notifyCommittable(List<ManifestCommittable> committables) {
         for (PartitionListener trigger : listeners) {
-            trigger.notifyCommittable(committables, false);
+            trigger.notifyCommittable(committables, true);
         }
     }
 
     public void notifyCommittable(
-            List<ManifestCommittable> committables, boolean recoveryFromState) 
{
+            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
         for (PartitionListener trigger : listeners) {
-            trigger.notifyCommittable(committables, recoveryFromState);
+            trigger.notifyCommittable(committables, 
partitionMarkDoneRecoverFromState);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index fad23c6085..96081dee0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -134,14 +134,13 @@ public class PartitionMarkDone implements 
PartitionListener {
 
     @Override
     public void notifyCommittable(
-            List<ManifestCommittable> committables, boolean recoverFromState) {
-        if (recoverFromState) {
-            return;
-        }
-        if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
-            markDoneByWatermark(committables);
-        } else {
-            markDoneByProcessTime(committables);
+            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
+        if (partitionMarkDoneRecoverFromState) {
+            if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
+                markDoneByWatermark(committables);
+            } else {
+                markDoneByProcessTime(committables);
+            }
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index effb63c87e..db20656b85 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -86,7 +86,7 @@ public class ReportPartStatsListener implements 
PartitionListener {
     }
 
     public void notifyCommittable(
-            List<ManifestCommittable> committables, boolean 
recoverFromCheckpoint) {
+            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
         Set<String> partition = new HashSet<>();
         boolean endInput = false;
         for (ManifestCommittable committable : committables) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index b5ce23ef50..4ad1dff9aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.utils.TestingMetricUtils;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
@@ -547,6 +548,49 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         assertThat(snapshot).isNotNull();
     }
 
+    @Test
+    public void testNotTriggerPartitionMarkDownWhenRecoverFromState() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            
options.set(CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true");
+                            options.set(
+                                    
CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");
+                            options.set(
+                                    
FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "1h");
+                        },
+                        Collections.singletonList("b"));
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table, false);
+        testHarness.open();
+        OperatorSubtaskState snapshotState =
+                writeAndSnapshot(table, "commitUser", 1, Long.MAX_VALUE, 
testHarness);
+        testHarness.close();
+
+        testHarness = createRecoverableTestHarness(table, false);
+        try {
+            // commit snapshot from state, fail intentionally
+            testHarness.initializeState(snapshotState);
+            testHarness.open();
+            fail("Expecting intentional exception");
+        } catch (Exception e) {
+            assertThat(e)
+                    .hasMessageContaining(
+                            "This exception is intentionally thrown "
+                                    + "after committing the restored 
checkpoints. "
+                                    + "By restarting the job we hope that "
+                                    + "writers can start writing based on 
these new commits.");
+        }
+
+        testHarness.notifyOfCompletedCheckpoint(Long.MAX_VALUE);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+
+        Path successFile = new Path(table.location(), "b=10/_SUCCESS");
+        assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+    }
+
     @Test
     public void testEmptyCommitWithProcessTimeTag() throws Exception {
         FileStoreTable table =
@@ -715,12 +759,20 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
     protected OneInputStreamOperatorTestHarness<Committable, Committable>
             createRecoverableTestHarness(FileStoreTable table) throws 
Exception {
+        return createRecoverableTestHarness(table, true);
+    }
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable>
+            createRecoverableTestHarness(
+                    FileStoreTable table, boolean 
partitionMarkDownRecoverFromState)
+                    throws Exception {
         OneInputStreamOperatorFactory<Committable, Committable> 
operatorFactory =
                 createCommitterOperatorFactory(
                         table,
                         null,
                         new RestoreAndFailCommittableStateManager<>(
-                                ManifestCommittableSerializer::new));
+                                ManifestCommittableSerializer::new,
+                                partitionMarkDownRecoverFromState));
         return createTestHarness(operatorFactory);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 349d56c71c..eb7de0fd41 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -58,20 +58,21 @@ class PartitionMarkDoneTest extends TableTestBase {
 
     @Test
     public void testTriggerByCompaction() throws Exception {
-        innerTest(true, false);
+        innerTest(true, true);
     }
 
     @Test
     public void testNotTriggerByCompaction() throws Exception {
-        innerTest(false, false);
+        innerTest(false, true);
     }
 
     @Test
-    public void testTriggerWhenRecoveryFromState() throws Exception {
-        innerTest(false, true);
+    public void testNotTriggerWhenRecoveryFromState() throws Exception {
+        innerTest(false, false);
     }
 
-    private void innerTest(boolean deletionVectors, boolean recoveryFromState) 
throws Exception {
+    private void innerTest(boolean deletionVectors, boolean 
partitionMarkDoneRecoverFromState)
+            throws Exception {
         Identifier identifier = identifier("T");
         Schema schema =
                 Schema.newBuilder()
@@ -99,26 +100,29 @@ class PartitionMarkDoneTest extends TableTestBase {
                                 table)
                         .get();
 
-        if (recoveryFromState) {
-            notifyCommits(markDone, false, true);
+        if (!partitionMarkDoneRecoverFromState) {
+            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
             assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+            return;
         }
 
-        notifyCommits(markDone, true);
+        notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
         
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
 
         if (!deletionVectors) {
-            notifyCommits(markDone, false);
+            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
             assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
         }
     }
 
     public static void notifyCommits(PartitionMarkDone markDone, boolean 
isCompact) {
-        notifyCommits(markDone, isCompact, false);
+        notifyCommits(markDone, isCompact, true);
     }
 
     private static void notifyCommits(
-            PartitionMarkDone markDone, boolean isCompact, boolean 
recoveryFromState) {
+            PartitionMarkDone markDone,
+            boolean isCompact,
+            boolean partitionMarkDoneRecoverFromState) {
         ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
         DataFileMeta file = DataFileTestUtils.newFile();
         CommitMessageImpl compactMessage;
@@ -142,7 +146,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                             new IndexIncrement(emptyList()));
         }
         committable.addFileCommittable(compactMessage);
-        markDone.notifyCommittable(singletonList(committable), 
recoveryFromState);
+        markDone.notifyCommittable(singletonList(committable), 
partitionMarkDoneRecoverFromState);
     }
 
     public static class MockOperatorStateStore implements OperatorStateStore {

Reply via email to