This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 28eb2ac944 [flink] Add procedure to remove un-existing manifest file
(#6643)
28eb2ac944 is described below
commit 28eb2ac9446801eb32a24ef97addefe5e15d3758
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 21 12:34:34 2025 +0800
[flink] Add procedure to remove un-existing manifest file (#6643)
---
docs/content/flink/procedures.md | 19 ++
.../paimon/operation/FileStoreCommitImpl.java | 33 ++++
.../action/RemoveUnexistingManifestsAction.java | 116 ++++++++++++
.../RemoveUnexistingManifestsActionFactory.java | 61 ++++++
.../RemoveUnexistingManifestsProcedure.java | 69 +++++++
.../services/org.apache.paimon.factories.Factory | 4 +-
.../RemoveUnexistingManifestsActionITCase.java | 210 +++++++++++++++++++++
7 files changed, 511 insertions(+), 1 deletion(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a2a20e4a34..f5ed938f96 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -380,6 +380,25 @@ All available procedures are listed below.
-- only check what files will be removed, but not really remove them
(dry run)
CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` =
true)
</td>
+ </tr>
+ <tr>
+ <td>remove_unexisting_manifests</td>
+ <td>
+ -- Use named argument<br/>
+ CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier')
<br/><br/>
+ </td>
+ <td>
+ Procedure to remove unexisting manifest file from manifset-list. for
detailed use cases. Arguments:
+ <li>table: the target table identifier. Cannot be empty, you can
use database.table$branch_xx to remove branch table unexisting manifest
file.</li>
+ <br>
+ Note that user is on his own risk using this procedure, which may
cause data loss when used outside from the use cases listed in Java docs.
+ </td>
+ <td>
+ -- remove unexisting manifest file in the table `mydb.myt`<br/>
+ CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt')<br/><br/>
+ -- remove unexisting manifest file in the branch table
`mydb.myt$branch_rt`<br/>
+ CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt$branch_rt')
+ </td>
</tr>
<tr>
<td>reset_consumer</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a078bd90ac..ceb68ed6ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1237,6 +1237,39 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return new SuccessResult();
}
+ public boolean replaceManifestList(
+ Snapshot latest,
+ long totalRecordCount,
+ Pair<String, Long> baseManifestList,
+ Pair<String, Long> deltaManifestList) {
+ Snapshot newSnapshot =
+ new Snapshot(
+ latest.id() + 1,
+ latest.schemaId(),
+ baseManifestList.getLeft(),
+ baseManifestList.getRight(),
+ deltaManifestList.getKey(),
+ deltaManifestList.getRight(),
+ null,
+ null,
+ latest.indexManifest(),
+ commitUser,
+ Long.MAX_VALUE,
+ CommitKind.OVERWRITE,
+ System.currentTimeMillis(),
+ latest.logOffsets(),
+ totalRecordCount,
+ 0L,
+ 0L,
+ latest.watermark(),
+ latest.statistics(),
+ // if empty properties, just set to null
+ latest.properties(),
+ latest.nextRowId());
+
+ return commitSnapshotImpl(newSnapshot, Collections.emptyList());
+ }
+
private long assignRowTrackingMeta(
long firstRowIdStart,
List<ManifestEntry> deltaFiles,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
new file mode 100644
index 0000000000..e0fe8ac870
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+
+/** Action to remove the un-existing manifest file. */
+public class RemoveUnexistingManifestsAction extends ActionBase implements
LocalAction {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RemoveUnexistingManifestsAction.class);
+
+ private final String databaseName;
+ private final String tableName;
+
+ public RemoveUnexistingManifestsAction(
+ String databaseName, String tableName, Map<String, String>
catalogConfig) {
+ super(catalogConfig);
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void executeLocally() throws Exception {
+ Identifier identifier = new Identifier(databaseName, tableName);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ FileIO fileIO = table.fileIO();
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ if (latest == null) {
+ return;
+ }
+
+ ManifestsReader manifestsReader =
table.store().newScan().manifestsReader();
+ ManifestsReader.Result manifestsResult = manifestsReader.read(latest,
ScanMode.ALL);
+ List<ManifestFileMeta> manifests = manifestsResult.allManifests;
+ List<ManifestFileMeta> existingManifestFiles = new ArrayList<>();
+ List<ManifestEntry> baseManifestEntries = new ArrayList<>();
+
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ boolean brokenManifestFile = false;
+ for (ManifestFileMeta meta : manifests) {
+ try {
+ Path path = pathFactory.toManifestFilePath(meta.fileName());
+ if (!fileIO.exists(path)) {
+ brokenManifestFile = true;
+ LOG.warn("Drop manifest file: " + meta.fileName());
+ } else {
+
baseManifestEntries.addAll(table.store().newScan().readManifest(meta));
+ existingManifestFiles.add(meta);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Exception happens", e);
+ }
+ }
+
+ if (!brokenManifestFile) {
+ return;
+ }
+
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ long totalRecordCount = recordCount(baseManifestEntries);
+ Pair<String, Long> baseManifestList =
manifestList.write(existingManifestFiles);
+ Pair<String, Long> deltaManifestList =
manifestList.write(Collections.emptyList());
+
+ try (FileStoreCommitImpl fileStoreCommit =
+ (FileStoreCommitImpl)
+ table.store().newCommit("Repair-table-" +
UUID.randomUUID(), table)) {
+ boolean result =
+ fileStoreCommit.replaceManifestList(
+ latest, totalRecordCount, baseManifestList,
deltaManifestList);
+ if (!result) {
+ throw new RuntimeException(
+ "Failed, snapshot conflict, maybe multiple jobs is
running to commit snapshots.");
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
new file mode 100644
index 0000000000..86cf443077
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link RemoveUnexistingManifestsAction}. */
+public class RemoveUnexistingManifestsActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "remove_unexisting_manifests";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ RemoveUnexistingManifestsAction action =
+ new RemoveUnexistingManifestsAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params));
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"remove_unexisting_manifests\" removes unexisting
manifest file from manifest list.");
+ System.out.println(
+ "See Java docs in
https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.html
for detailed use cases.");
+ System.out.println(
+ "Note that user is on his own risk using this procedure, which
may cause data loss when used outside from the use cases in Java docs.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " remove_unexisting_manifests \\\n"
+ + "--warehouse <warehouse_path> \\\n"
+ + "--database <database_name> \\\n"
+ + "--table <table_name> \\\n");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
new file mode 100644
index 0000000000..19b4200c32
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.RemoveUnexistingManifestsAction;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to remove unexisting manifest files from manifest list. See {@link
+ * RemoveUnexistingManifestsAction} for detailed use cases.
+ *
+ * <pre><code>
+ * -- remove unexisting data files in table `mydb.myt`
+ * CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt$branch_rt')
+ * </code></pre>
+ *
+ * <p>Note that user is on his own risk using this procedure, which may cause
data loss when used
+ * outside from the use cases above.
+ */
+public class RemoveUnexistingManifestsProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "remove_unexisting_manifests";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "tableId", type =
@DataTypeHint("STRING")),
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ Identifier identifier = Identifier.fromString(tableId);
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getObjectName();
+ try {
+ RemoveUnexistingManifestsAction unexistingManifestsAction =
+ new RemoveUnexistingManifestsAction(databaseName,
tableName, catalog.options());
+ unexistingManifestsAction.run();
+ return new String[] {"Success"};
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 56ac74c698..aef64a43d2 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,6 +44,7 @@ org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
org.apache.paimon.flink.action.ExpireChangelogsActionFactory
org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
+org.apache.paimon.flink.action.RemoveUnexistingManifestsActionFactory
org.apache.paimon.flink.action.ClearConsumerActionFactory
org.apache.paimon.flink.action.RescaleActionFactory
org.apache.paimon.flink.action.CloneActionFactory
@@ -95,4 +96,5 @@ org.apache.paimon.flink.procedure.CreateFunctionProcedure
org.apache.paimon.flink.procedure.DropFunctionProcedure
org.apache.paimon.flink.procedure.AlterFunctionProcedure
org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
-org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
+org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
new file mode 100644
index 0000000000..e57d9cd787
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.String.format;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
+
+/** IT cases for {@link RemoveUnexistingFilesAction}. */
+public class RemoveUnexistingManifestsActionITCase extends ActionITCaseBase {
+
+ @Test
+ public void testAction() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.singletonMap("manifest.target-file-size",
"10 B"));
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ List<GenericRow> data = new ArrayList<>();
+ data.add(rowData(1L, BinaryString.fromString("Hi")));
+ data.add(rowData(2L, BinaryString.fromString("Hello")));
+ data.add(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // 3 snapshots
+ writeData(data.get(0));
+ writeData(data.get(1));
+ writeData(data.get(2));
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ List<InternalRow> results = getData(tableName);
+ assertThat(results).hasSize(3);
+ results.forEach(
+ row -> {
+ int pos = (int) row.getLong(0) - 1;
+ assertThat(row.getString(1).toString())
+ .isEqualTo(data.get(pos).getString(1).toString());
+ });
+
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ List<ManifestFileMeta> manifestFileMetas =
+ table.store()
+ .newScan()
+ .manifestsReader()
+ .read(snapshotManager.latestSnapshot(), ScanMode.ALL)
+ .allManifests;
+ Path path =
pathFactory.toManifestFilePath(manifestFileMetas.get(1).fileName());
+ table.fileIO().delete(path, false);
+
+ assertThatCode(() -> getData(tableName)).hasMessageContaining("not
found");
+
+ executeSQL(format("CALL sys.remove_unexisting_manifests('%s.%s')",
database, tableName));
+
+ results = getData(tableName);
+ assertThat(results).hasSize(2);
+ results.forEach(
+ row -> {
+ int pos = (int) row.getLong(0) - 1;
+ assertThat(row.getString(1).toString())
+ .isEqualTo(data.get(pos).getString(1).toString());
+ });
+ }
+
+ @Test
+ public void testActionForBranch() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.singletonMap("manifest.target-file-size",
"10 B"));
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ List<GenericRow> data = new ArrayList<>();
+ data.add(rowData(1L, BinaryString.fromString("Hi")));
+ data.add(rowData(2L, BinaryString.fromString("Hello")));
+ data.add(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // 3 snapshots
+ writeData(data.get(0));
+ writeData(data.get(1));
+ writeData(data.get(2));
+
+ table.createBranch("rt");
+
+ table =
+ (FileStoreTable)
+ catalog.getTable(
+ Identifier.fromString(
+ format("%s.%s$branch_rt", database,
tableName)));
+ writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+ String tableNameBranch = tableName + "$branch_rt";
+ List<GenericRow> branchData = new ArrayList<>();
+ branchData.add(rowData(4L, BinaryString.fromString("Hi 4")));
+ branchData.add(rowData(5L, BinaryString.fromString("Hello 5")));
+ branchData.add(rowData(6L, BinaryString.fromString("Paimon 6")));
+ // 3 snapshots also
+ writeData(branchData.get(0));
+ writeData(branchData.get(1));
+ writeData(branchData.get(2));
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ List<InternalRow> results = getData(tableNameBranch);
+ assertThat(results).hasSize(3);
+ results.forEach(
+ row -> {
+ int pos = (int) row.getLong(0) - 4;
+ assertThat(row.getString(1).toString())
+
.isEqualTo(branchData.get(pos).getString(1).toString());
+ });
+
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ List<ManifestFileMeta> manifestFileMetas =
+ table.store()
+ .newScan()
+ .manifestsReader()
+ .read(snapshotManager.latestSnapshot(), ScanMode.ALL)
+ .allManifests;
+ Path path =
pathFactory.toManifestFilePath(manifestFileMetas.get(1).fileName());
+ table.fileIO().delete(path, false);
+
+ assertThatCode(() ->
getData(tableNameBranch)).hasMessageContaining("not found");
+
+ executeSQL(
+ format("CALL sys.remove_unexisting_manifests('%s.%s')",
database, tableNameBranch));
+
+ results = getData(tableNameBranch);
+ assertThat(results).hasSize(2);
+ results.forEach(
+ row -> {
+ int pos = (int) row.getLong(0) - 4;
+ assertThat(row.getString(1).toString())
+
.isEqualTo(branchData.get(pos).getString(1).toString());
+ });
+
+ // do not affect main branch
+ results = getData(tableName);
+ assertThat(results).hasSize(3);
+ results.forEach(
+ row -> {
+ int pos = (int) row.getLong(0) - 1;
+ assertThat(row.getString(1).toString())
+ .isEqualTo(data.get(pos).getString(1).toString());
+ });
+ }
+}