This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 28eb2ac944 [flink] Add procedure to remove un-existing manifest file 
(#6643)
28eb2ac944 is described below

commit 28eb2ac9446801eb32a24ef97addefe5e15d3758
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 21 12:34:34 2025 +0800

    [flink] Add procedure to remove un-existing manifest file (#6643)
---
 docs/content/flink/procedures.md                   |  19 ++
 .../paimon/operation/FileStoreCommitImpl.java      |  33 ++++
 .../action/RemoveUnexistingManifestsAction.java    | 116 ++++++++++++
 .../RemoveUnexistingManifestsActionFactory.java    |  61 ++++++
 .../RemoveUnexistingManifestsProcedure.java        |  69 +++++++
 .../services/org.apache.paimon.factories.Factory   |   4 +-
 .../RemoveUnexistingManifestsActionITCase.java     | 210 +++++++++++++++++++++
 7 files changed, 511 insertions(+), 1 deletion(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a2a20e4a34..f5ed938f96 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -380,6 +380,25 @@ All available procedures are listed below.
         -- only check what files will be removed, but not really remove them 
(dry run)
         CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = 
true)
       </td>
+   </tr>
+    <tr>
+      <td>remove_unexisting_manifests</td>
+      <td>
+         -- Use named argument<br/>
+         CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier') 
<br/><br/>
+      </td>
+      <td>
+         Procedure to remove unexisting manifest file from manifset-list. for 
detailed use cases. Arguments:
+            <li>table: the target table identifier. Cannot be empty, you can 
use database.table$branch_xx to remove branch table unexisting manifest 
file.</li>
+         <br>
+         Note that user is on his own risk using this procedure, which may 
cause data loss when used outside from the use cases listed in Java docs.
+      </td>
+      <td>
+        -- remove unexisting manifest file in the table `mydb.myt`<br/>
+        CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt')<br/><br/>
+        -- remove unexisting manifest file in the branch table 
`mydb.myt$branch_rt`<br/>
+        CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt$branch_rt')
+      </td>
    </tr>
    <tr>
       <td>reset_consumer</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a078bd90ac..ceb68ed6ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1237,6 +1237,39 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return new SuccessResult();
     }
 
+    public boolean replaceManifestList(
+            Snapshot latest,
+            long totalRecordCount,
+            Pair<String, Long> baseManifestList,
+            Pair<String, Long> deltaManifestList) {
+        Snapshot newSnapshot =
+                new Snapshot(
+                        latest.id() + 1,
+                        latest.schemaId(),
+                        baseManifestList.getLeft(),
+                        baseManifestList.getRight(),
+                        deltaManifestList.getKey(),
+                        deltaManifestList.getRight(),
+                        null,
+                        null,
+                        latest.indexManifest(),
+                        commitUser,
+                        Long.MAX_VALUE,
+                        CommitKind.OVERWRITE,
+                        System.currentTimeMillis(),
+                        latest.logOffsets(),
+                        totalRecordCount,
+                        0L,
+                        0L,
+                        latest.watermark(),
+                        latest.statistics(),
+                        // if empty properties, just set to null
+                        latest.properties(),
+                        latest.nextRowId());
+
+        return commitSnapshotImpl(newSnapshot, Collections.emptyList());
+    }
+
     private long assignRowTrackingMeta(
             long firstRowIdStart,
             List<ManifestEntry> deltaFiles,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
new file mode 100644
index 0000000000..e0fe8ac870
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+
+/** Action to remove the un-existing manifest file. */
+public class RemoveUnexistingManifestsAction extends ActionBase implements 
LocalAction {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(RemoveUnexistingManifestsAction.class);
+
+    private final String databaseName;
+    private final String tableName;
+
+    public RemoveUnexistingManifestsAction(
+            String databaseName, String tableName, Map<String, String> 
catalogConfig) {
+        super(catalogConfig);
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
+
+    @Override
+    public void executeLocally() throws Exception {
+        Identifier identifier = new Identifier(databaseName, tableName);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        FileIO fileIO = table.fileIO();
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        if (latest == null) {
+            return;
+        }
+
+        ManifestsReader manifestsReader = 
table.store().newScan().manifestsReader();
+        ManifestsReader.Result manifestsResult = manifestsReader.read(latest, 
ScanMode.ALL);
+        List<ManifestFileMeta> manifests = manifestsResult.allManifests;
+        List<ManifestFileMeta> existingManifestFiles = new ArrayList<>();
+        List<ManifestEntry> baseManifestEntries = new ArrayList<>();
+
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        boolean brokenManifestFile = false;
+        for (ManifestFileMeta meta : manifests) {
+            try {
+                Path path = pathFactory.toManifestFilePath(meta.fileName());
+                if (!fileIO.exists(path)) {
+                    brokenManifestFile = true;
+                    LOG.warn("Drop manifest file: " + meta.fileName());
+                } else {
+                    
baseManifestEntries.addAll(table.store().newScan().readManifest(meta));
+                    existingManifestFiles.add(meta);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("Exception happens", e);
+            }
+        }
+
+        if (!brokenManifestFile) {
+            return;
+        }
+
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        long totalRecordCount = recordCount(baseManifestEntries);
+        Pair<String, Long> baseManifestList = 
manifestList.write(existingManifestFiles);
+        Pair<String, Long> deltaManifestList = 
manifestList.write(Collections.emptyList());
+
+        try (FileStoreCommitImpl fileStoreCommit =
+                (FileStoreCommitImpl)
+                        table.store().newCommit("Repair-table-" + 
UUID.randomUUID(), table)) {
+            boolean result =
+                    fileStoreCommit.replaceManifestList(
+                            latest, totalRecordCount, baseManifestList, 
deltaManifestList);
+            if (!result) {
+                throw new RuntimeException(
+                        "Failed, snapshot conflict, maybe multiple jobs is 
running to commit snapshots.");
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
new file mode 100644
index 0000000000..86cf443077
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link RemoveUnexistingManifestsAction}. */
+public class RemoveUnexistingManifestsActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "remove_unexisting_manifests";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        RemoveUnexistingManifestsAction action =
+                new RemoveUnexistingManifestsAction(
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        catalogConfigMap(params));
+
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"remove_unexisting_manifests\" removes unexisting 
manifest file from manifest list.");
+        System.out.println(
+                "See Java docs in 
https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsAction.html
 for detailed use cases.");
+        System.out.println(
+                "Note that user is on his own risk using this procedure, which 
may cause data loss when used outside from the use cases in Java docs.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  remove_unexisting_manifests \\\n"
+                        + "--warehouse <warehouse_path> \\\n"
+                        + "--database <database_name> \\\n"
+                        + "--table <table_name> \\\n");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
new file mode 100644
index 0000000000..19b4200c32
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingManifestsProcedure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.RemoveUnexistingManifestsAction;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to remove unexisting manifest files from manifest list. See {@link
+ * RemoveUnexistingManifestsAction} for detailed use cases.
+ *
+ * <pre><code>
+ *  -- remove unexisting data files in table `mydb.myt`
+ *  CALL sys.remove_unexisting_manifests(`table` => 'mydb.myt$branch_rt')
+ * </code></pre>
+ *
+ * <p>Note that user is on his own risk using this procedure, which may cause 
data loss when used
+ * outside from the use cases above.
+ */
+public class RemoveUnexistingManifestsProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "remove_unexisting_manifests";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "tableId", type = 
@DataTypeHint("STRING")),
+            })
+    public String[] call(ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        Identifier identifier = Identifier.fromString(tableId);
+        String databaseName = identifier.getDatabaseName();
+        String tableName = identifier.getObjectName();
+        try {
+            RemoveUnexistingManifestsAction unexistingManifestsAction =
+                    new RemoveUnexistingManifestsAction(databaseName, 
tableName, catalog.options());
+            unexistingManifestsAction.run();
+            return new String[] {"Success"};
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 56ac74c698..aef64a43d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,6 +44,7 @@ org.apache.paimon.flink.action.RewriteFileIndexActionFactory
 org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
 org.apache.paimon.flink.action.ExpireChangelogsActionFactory
 org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
+org.apache.paimon.flink.action.RemoveUnexistingManifestsActionFactory
 org.apache.paimon.flink.action.ClearConsumerActionFactory
 org.apache.paimon.flink.action.RescaleActionFactory
 org.apache.paimon.flink.action.CloneActionFactory
@@ -95,4 +96,5 @@ org.apache.paimon.flink.procedure.CreateFunctionProcedure
 org.apache.paimon.flink.procedure.DropFunctionProcedure
 org.apache.paimon.flink.procedure.AlterFunctionProcedure
 org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
-org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
+org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
new file mode 100644
index 0000000000..e57d9cd787
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingManifestsActionITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.String.format;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
+
+/** IT cases for {@link RemoveUnexistingFilesAction}. */
+public class RemoveUnexistingManifestsActionITCase extends ActionITCaseBase {
+
+    @Test
+    public void testAction() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.singletonMap("manifest.target-file-size", 
"10 B"));
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        List<GenericRow> data = new ArrayList<>();
+        data.add(rowData(1L, BinaryString.fromString("Hi")));
+        data.add(rowData(2L, BinaryString.fromString("Hello")));
+        data.add(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // 3 snapshots
+        writeData(data.get(0));
+        writeData(data.get(1));
+        writeData(data.get(2));
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        List<InternalRow> results = getData(tableName);
+        assertThat(results).hasSize(3);
+        results.forEach(
+                row -> {
+                    int pos = (int) row.getLong(0) - 1;
+                    assertThat(row.getString(1).toString())
+                            .isEqualTo(data.get(pos).getString(1).toString());
+                });
+
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        List<ManifestFileMeta> manifestFileMetas =
+                table.store()
+                        .newScan()
+                        .manifestsReader()
+                        .read(snapshotManager.latestSnapshot(), ScanMode.ALL)
+                        .allManifests;
+        Path path = 
pathFactory.toManifestFilePath(manifestFileMetas.get(1).fileName());
+        table.fileIO().delete(path, false);
+
+        assertThatCode(() -> getData(tableName)).hasMessageContaining("not 
found");
+
+        executeSQL(format("CALL sys.remove_unexisting_manifests('%s.%s')", 
database, tableName));
+
+        results = getData(tableName);
+        assertThat(results).hasSize(2);
+        results.forEach(
+                row -> {
+                    int pos = (int) row.getLong(0) - 1;
+                    assertThat(row.getString(1).toString())
+                            .isEqualTo(data.get(pos).getString(1).toString());
+                });
+    }
+
+    @Test
+    public void testActionForBranch() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.singletonMap("manifest.target-file-size", 
"10 B"));
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        List<GenericRow> data = new ArrayList<>();
+        data.add(rowData(1L, BinaryString.fromString("Hi")));
+        data.add(rowData(2L, BinaryString.fromString("Hello")));
+        data.add(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // 3 snapshots
+        writeData(data.get(0));
+        writeData(data.get(1));
+        writeData(data.get(2));
+
+        table.createBranch("rt");
+
+        table =
+                (FileStoreTable)
+                        catalog.getTable(
+                                Identifier.fromString(
+                                        format("%s.%s$branch_rt", database, 
tableName)));
+        writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+        String tableNameBranch = tableName + "$branch_rt";
+        List<GenericRow> branchData = new ArrayList<>();
+        branchData.add(rowData(4L, BinaryString.fromString("Hi 4")));
+        branchData.add(rowData(5L, BinaryString.fromString("Hello 5")));
+        branchData.add(rowData(6L, BinaryString.fromString("Paimon 6")));
+        // 3 snapshots also
+        writeData(branchData.get(0));
+        writeData(branchData.get(1));
+        writeData(branchData.get(2));
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        List<InternalRow> results = getData(tableNameBranch);
+        assertThat(results).hasSize(3);
+        results.forEach(
+                row -> {
+                    int pos = (int) row.getLong(0) - 4;
+                    assertThat(row.getString(1).toString())
+                            
.isEqualTo(branchData.get(pos).getString(1).toString());
+                });
+
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        List<ManifestFileMeta> manifestFileMetas =
+                table.store()
+                        .newScan()
+                        .manifestsReader()
+                        .read(snapshotManager.latestSnapshot(), ScanMode.ALL)
+                        .allManifests;
+        Path path = 
pathFactory.toManifestFilePath(manifestFileMetas.get(1).fileName());
+        table.fileIO().delete(path, false);
+
+        assertThatCode(() -> 
getData(tableNameBranch)).hasMessageContaining("not found");
+
+        executeSQL(
+                format("CALL sys.remove_unexisting_manifests('%s.%s')", 
database, tableNameBranch));
+
+        results = getData(tableNameBranch);
+        assertThat(results).hasSize(2);
+        results.forEach(
+                row -> {
+                    int pos = (int) row.getLong(0) - 4;
+                    assertThat(row.getString(1).toString())
+                            
.isEqualTo(branchData.get(pos).getString(1).toString());
+                });
+
+        // do not affect main branch
+        results = getData(tableName);
+        assertThat(results).hasSize(3);
+        results.forEach(
+                row -> {
+                    int pos = (int) row.getLong(0) - 1;
+                    assertThat(row.getString(1).toString())
+                            .isEqualTo(data.get(pos).getString(1).toString());
+                });
+    }
+}

Reply via email to