This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3dcb1047c8 [flink] Introduce 'partition.mark-done.recover-from-state'
(#5648)
3dcb1047c8 is described below
commit 3dcb1047c835f896662ee06e1eb3edceda8f98a2
Author: wangwj <[email protected]>
AuthorDate: Thu May 22 18:00:24 2025 +0800
[flink] Introduce 'partition.mark-done.recover-from-state' (#5648)
---
.../generated/flink_connector_configuration.html | 6 +++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +++
.../flink/sink/CombinedTableCompactorSink.java | 4 +-
.../org/apache/paimon/flink/sink/Committer.java | 7 +--
.../paimon/flink/sink/CommitterOperator.java | 2 +-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 8 +++-
.../RestoreAndFailCommittableStateManager.java | 12 ++++-
.../apache/paimon/flink/sink/StoreCommitter.java | 4 +-
.../paimon/flink/sink/StoreMultiCommitter.java | 7 ++-
.../flink/sink/partition/PartitionListener.java | 3 +-
.../flink/sink/partition/PartitionListeners.java | 6 +--
.../flink/sink/partition/PartitionMarkDone.java | 15 +++---
.../sink/partition/ReportPartStatsListener.java | 2 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 54 +++++++++++++++++++++-
.../sink/partition/PartitionMarkDoneTest.java | 28 ++++++-----
15 files changed, 125 insertions(+), 40 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f348c35e8e..53f6884279 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -98,6 +98,12 @@ under the License.
<td><p>Enum</p></td>
<td>How to trigger partition mark done action.<br /><br />Possible
values:<ul><li>"process-time": Based on the time of the machine, mark the
partition done once the processing time passes period time plus
delay.</li><li>"watermark": Based on the watermark of the input, mark the
partition done once the watermark passes period time plus delay.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>partition.mark-done.recover-from-state</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether trigger partition mark done when recover from
state.</td>
+ </tr>
<tr>
<td><h5>partition.time-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a9e7f0f7d1..a0ff463356 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -378,6 +378,13 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for
example, "
+ "daily partition is '1 d', hourly
partition is '1 h'.");
+ public static final ConfigOption<Boolean>
PARTITION_MARK_DONE_RECOVER_FROM_STATE =
+ key("partition.mark-done.recover-from-state")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether trigger partition mark done when recover
from state.");
+
public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 3d40bb6f49..7f3366d1b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -41,6 +41,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
@@ -203,6 +204,7 @@ public class CombinedTableCompactorSink implements
Serializable {
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
return new RestoreAndFailCommittableStateManager<>(
- WrappedManifestCommittableSerializer::new);
+ WrappedManifestCommittableSerializer::new,
+ options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 4679318b7e..31acb1e91d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -56,14 +56,9 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
int filterAndCommit(
List<GlobalCommitT> globalCommittables,
boolean checkAppendFiles,
- boolean recoveryFromState)
+ boolean partitionMarkDoneRecoverFromState)
throws IOException;
- default int filterAndCommitFromState(List<GlobalCommitT>
globalCommittables)
- throws IOException {
- return filterAndCommit(globalCommittables, true, true);
- }
-
Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT>
committables);
/** Factory to create {@link Committer}. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index b5c73f17f9..64196585c1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -210,7 +210,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
// So when `endInput` is called, we must check if the
corresponding snapshot exists.
// However, if the snapshot does not exist, then append files must
be new files. So
// there is no need to check for duplicated append files.
- committer.filterAndCommit(committables, false, false);
+ committer.filterAndCommit(committables, false, true);
} else {
committer.commit(committables);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index f36dae4a83..74f1febd90 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import javax.annotation.Nullable;
import java.util.Map;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE;
+
/** A {@link FlinkSink} to write records. */
public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
@@ -55,6 +58,9 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
@Override
protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
- return new
RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new);
+ Options options = table.coreOptions().toConfiguration();
+ return new RestoreAndFailCommittableStateManager<>(
+ ManifestCommittableSerializer::new,
+ options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index dda1abf90d..a9b0922d0b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -51,12 +51,21 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
/** The committable's serializer. */
private final SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer;
+ private final boolean partitionMarkDoneRecoverFromState;
+
/** GlobalCommitT state of this job. Used to filter out previous
successful commits. */
private ListState<GlobalCommitT> streamingCommitterState;
public RestoreAndFailCommittableStateManager(
SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer) {
+ this(committableSerializer, true);
+ }
+
+ public RestoreAndFailCommittableStateManager(
+ SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer,
+ boolean partitionMarkDoneRecoverFromState) {
this.committableSerializer = committableSerializer;
+ this.partitionMarkDoneRecoverFromState =
partitionMarkDoneRecoverFromState;
}
@Override
@@ -79,7 +88,8 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
private void recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
throws Exception {
- int numCommitted = committer.filterAndCommitFromState(committables);
+ int numCommitted =
+ committer.filterAndCommit(committables, true,
partitionMarkDoneRecoverFromState);
if (numCommitted > 0) {
throw new RuntimeException(
"This exception is intentionally thrown "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index d3d6f206cc..3b5d5bf627 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -117,9 +117,9 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
public int filterAndCommit(
List<ManifestCommittable> globalCommittables,
boolean checkAppendFiles,
- boolean recoveryFromState) {
+ boolean partitionMarkDoneRecoverFromState) {
int committed = commit.filterAndCommitMultiple(globalCommittables,
checkAppendFiles);
- partitionListeners.notifyCommittable(globalCommittables,
recoveryFromState);
+ partitionListeners.notifyCommittable(globalCommittables,
partitionMarkDoneRecoverFromState);
return committed;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 886378ef7d..77aafe0baf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -180,13 +180,16 @@ public class StoreMultiCommitter
public int filterAndCommit(
List<WrappedManifestCommittable> globalCommittables,
boolean checkAppendFiles,
- boolean recoveryFromState) {
+ boolean partitionMarkDoneRecoverFromState) {
int result = 0;
for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
groupByTable(globalCommittables).entrySet()) {
result +=
getStoreCommitter(entry.getKey())
- .filterAndCommit(entry.getValue(),
checkAppendFiles, recoveryFromState);
+ .filterAndCommit(
+ entry.getValue(),
+ checkAppendFiles,
+ partitionMarkDoneRecoverFromState);
}
return result;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
index f01df4b870..18c365081a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
@@ -26,7 +26,8 @@ import java.util.List;
/** The partition listener. */
public interface PartitionListener extends Closeable {
- void notifyCommittable(List<ManifestCommittable> committables, boolean
recoverFromState);
+ void notifyCommittable(
+ List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState);
void snapshotState() throws Exception;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index 4e14e772a7..8f8d633d43 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -39,14 +39,14 @@ public class PartitionListeners implements Closeable {
public void notifyCommittable(List<ManifestCommittable> committables) {
for (PartitionListener trigger : listeners) {
- trigger.notifyCommittable(committables, false);
+ trigger.notifyCommittable(committables, true);
}
}
public void notifyCommittable(
- List<ManifestCommittable> committables, boolean recoveryFromState)
{
+ List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
for (PartitionListener trigger : listeners) {
- trigger.notifyCommittable(committables, recoveryFromState);
+ trigger.notifyCommittable(committables,
partitionMarkDoneRecoverFromState);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index fad23c6085..96081dee0c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -134,14 +134,13 @@ public class PartitionMarkDone implements
PartitionListener {
@Override
public void notifyCommittable(
- List<ManifestCommittable> committables, boolean recoverFromState) {
- if (recoverFromState) {
- return;
- }
- if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
- markDoneByWatermark(committables);
- } else {
- markDoneByProcessTime(committables);
+ List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
+ if (partitionMarkDoneRecoverFromState) {
+ if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
+ markDoneByWatermark(committables);
+ } else {
+ markDoneByProcessTime(committables);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index effb63c87e..db20656b85 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -86,7 +86,7 @@ public class ReportPartStatsListener implements
PartitionListener {
}
public void notifyCommittable(
- List<ManifestCommittable> committables, boolean
recoverFromCheckpoint) {
+ List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
Set<String> partition = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index b5ce23ef50..4ad1dff9aa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.utils.TestingMetricUtils;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
@@ -547,6 +548,49 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
assertThat(snapshot).isNotNull();
}
+ @Test
+ public void testNotTriggerPartitionMarkDownWhenRecoverFromState() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+
options.set(CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true");
+ options.set(
+
CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");
+ options.set(
+
FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "1h");
+ },
+ Collections.singletonList("b"));
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table, false);
+ testHarness.open();
+ OperatorSubtaskState snapshotState =
+ writeAndSnapshot(table, "commitUser", 1, Long.MAX_VALUE,
testHarness);
+ testHarness.close();
+
+ testHarness = createRecoverableTestHarness(table, false);
+ try {
+ // commit snapshot from state, fail intentionally
+ testHarness.initializeState(snapshotState);
+ testHarness.open();
+ fail("Expecting intentional exception");
+ } catch (Exception e) {
+ assertThat(e)
+ .hasMessageContaining(
+ "This exception is intentionally thrown "
+ + "after committing the restored
checkpoints. "
+ + "By restarting the job we hope that "
+ + "writers can start writing based on
these new commits.");
+ }
+
+ testHarness.notifyOfCompletedCheckpoint(Long.MAX_VALUE);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+
+ Path successFile = new Path(table.location(), "b=10/_SUCCESS");
+ assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+ }
+
@Test
public void testEmptyCommitWithProcessTimeTag() throws Exception {
FileStoreTable table =
@@ -715,12 +759,20 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
protected OneInputStreamOperatorTestHarness<Committable, Committable>
createRecoverableTestHarness(FileStoreTable table) throws
Exception {
+ return createRecoverableTestHarness(table, true);
+ }
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
+ createRecoverableTestHarness(
+ FileStoreTable table, boolean
partitionMarkDownRecoverFromState)
+ throws Exception {
OneInputStreamOperatorFactory<Committable, Committable>
operatorFactory =
createCommitterOperatorFactory(
table,
null,
new RestoreAndFailCommittableStateManager<>(
- ManifestCommittableSerializer::new));
+ ManifestCommittableSerializer::new,
+ partitionMarkDownRecoverFromState));
return createTestHarness(operatorFactory);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 349d56c71c..eb7de0fd41 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -58,20 +58,21 @@ class PartitionMarkDoneTest extends TableTestBase {
@Test
public void testTriggerByCompaction() throws Exception {
- innerTest(true, false);
+ innerTest(true, true);
}
@Test
public void testNotTriggerByCompaction() throws Exception {
- innerTest(false, false);
+ innerTest(false, true);
}
@Test
- public void testTriggerWhenRecoveryFromState() throws Exception {
- innerTest(false, true);
+ public void testNotTriggerWhenRecoveryFromState() throws Exception {
+ innerTest(false, false);
}
- private void innerTest(boolean deletionVectors, boolean recoveryFromState)
throws Exception {
+ private void innerTest(boolean deletionVectors, boolean
partitionMarkDoneRecoverFromState)
+ throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
@@ -99,26 +100,29 @@ class PartitionMarkDoneTest extends TableTestBase {
table)
.get();
- if (recoveryFromState) {
- notifyCommits(markDone, false, true);
+ if (!partitionMarkDoneRecoverFromState) {
+ notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+ return;
}
- notifyCommits(markDone, true);
+ notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
if (!deletionVectors) {
- notifyCommits(markDone, false);
+ notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
}
}
public static void notifyCommits(PartitionMarkDone markDone, boolean
isCompact) {
- notifyCommits(markDone, isCompact, false);
+ notifyCommits(markDone, isCompact, true);
}
private static void notifyCommits(
- PartitionMarkDone markDone, boolean isCompact, boolean
recoveryFromState) {
+ PartitionMarkDone markDone,
+ boolean isCompact,
+ boolean partitionMarkDoneRecoverFromState) {
ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
DataFileMeta file = DataFileTestUtils.newFile();
CommitMessageImpl compactMessage;
@@ -142,7 +146,7 @@ class PartitionMarkDoneTest extends TableTestBase {
new IndexIncrement(emptyList()));
}
committable.addFileCommittable(compactMessage);
- markDone.notifyCommittable(singletonList(committable),
recoveryFromState);
+ markDone.notifyCommittable(singletonList(committable),
partitionMarkDoneRecoverFromState);
}
public static class MockOperatorStateStore implements OperatorStateStore {