This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 02836eaac8 Spark: Ensure partition stats files are considered for GC 
procedures (#9284)
02836eaac8 is described below

commit 02836eaac8c8cd18f9998d0ae4411b09d3649e1a
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jan 19 00:40:53 2024 +0530

    Spark: Ensure partition stats files are considered for GC procedures (#9284)
---
 .../java/org/apache/iceberg/ReachableFileUtil.java | 42 +++++++++++-
 .../iceberg/spark/extensions/ProcedureUtil.java    | 54 +++++++++++++++
 .../extensions/TestExpireSnapshotsProcedure.java   | 76 ++++++++++++++++------
 .../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
 .../iceberg/spark/actions/BaseSparkAction.java     | 12 +---
 .../iceberg/spark/extensions/ProcedureUtil.java    | 54 +++++++++++++++
 .../extensions/TestExpireSnapshotsProcedure.java   | 76 ++++++++++++++++------
 .../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
 .../iceberg/spark/actions/BaseSparkAction.java     | 12 +---
 .../iceberg/spark/extensions/ProcedureUtil.java    | 54 +++++++++++++++
 .../extensions/TestExpireSnapshotsProcedure.java   | 76 ++++++++++++++++------
 .../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
 .../iceberg/spark/actions/BaseSparkAction.java     | 12 +---
 13 files changed, 586 insertions(+), 92 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java 
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index bd23a221ab..ee1ff28e03 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -133,13 +133,13 @@ public class ReachableFileUtil {
   }
 
   /**
-   * Returns locations of statistics files in a table.
+   * Returns locations of all statistics files in a table.
    *
    * @param table table for which statistics files needs to be listed
    * @return the location of statistics files
    */
   public static List<String> statisticsFilesLocations(Table table) {
-    return statisticsFilesLocations(table, statisticsFile -> true);
+    return statisticsFilesLocationsForSnapshots(table, null);
   }
 
   /**
@@ -148,7 +148,10 @@ public class ReachableFileUtil {
    * @param table table for which statistics files needs to be listed
    * @param predicate predicate for filtering the statistics files
    * @return the location of statistics files
+   * @deprecated since 1.5.0, will be removed in 1.6.0; use the {@code
+   *     statisticsFilesLocationsForSnapshots(table, snapshotIds)} instead.
    */
+  @Deprecated
   public static List<String> statisticsFilesLocations(
       Table table, Predicate<StatisticsFile> predicate) {
     return table.statisticsFiles().stream()
@@ -156,4 +159,39 @@ public class ReachableFileUtil {
         .map(StatisticsFile::path)
         .collect(Collectors.toList());
   }
+
+  /**
+   * Returns locations of all statistics files for a table matching the given 
snapshot IDs.
+   *
+   * @param table table for which statistics files needs to be listed
+   * @param snapshotIds ids of snapshots for which statistics files will be 
returned. If null,
+   *     statistics files for all the snapshots will be returned.
+   * @return the location of statistics files
+   */
+  public static List<String> statisticsFilesLocationsForSnapshots(
+      Table table, Set<Long> snapshotIds) {
+    List<String> statsFileLocations = Lists.newArrayList();
+
+    Predicate<StatisticsFile> statsFilePredicate;
+    Predicate<PartitionStatisticsFile> partitionStatsFilePredicate;
+    if (snapshotIds == null) {
+      statsFilePredicate = file -> true;
+      partitionStatsFilePredicate = file -> true;
+    } else {
+      statsFilePredicate = file -> snapshotIds.contains(file.snapshotId());
+      partitionStatsFilePredicate = file -> 
snapshotIds.contains(file.snapshotId());
+    }
+
+    table.statisticsFiles().stream()
+        .filter(statsFilePredicate)
+        .map(StatisticsFile::path)
+        .forEach(statsFileLocations::add);
+
+    table.partitionStatisticsFiles().stream()
+        .filter(partitionStatsFilePredicate)
+        .map(PartitionStatisticsFile::path)
+        .forEach(statsFileLocations::add);
+
+    return statsFileLocations;
+  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+  private ProcedureUtil() {}
+
+  static PartitionStatisticsFile writePartitionStatsFile(
+      long snapshotId, String statsLocation, FileIO fileIO) {
+    PositionOutputStream positionOutputStream;
+    try {
+      positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+      positionOutputStream.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return ImmutableGenericPartitionStatisticsFile.builder()
+        .snapshotId(snapshotId)
+        .fileSizeInBytes(42L)
+        .path(statsLocation)
+        .build();
+  }
+
+  static String statsFileLocation(String tableLocation) {
+    String statsFileName = "stats-file-" + UUID.randomUUID();
+    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
+  }
+}
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index efb3d43668..6383521a44 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,14 +28,13 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -458,7 +457,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
     sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
-    String statsFileLocation1 = statsFileLocation(table.location());
+    String statsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile1 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -469,7 +468,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
 
     sql("INSERT INTO %s SELECT 20, 'def'", tableName);
     table.refresh();
-    String statsFileLocation2 = statsFileLocation(table.location());
+    String statsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile2 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -488,18 +487,9 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics 
file").isEqualTo(1L);
 
     table.refresh();
-    List<StatisticsFile> statsWithSnapshotId1 =
-        table.statisticsFiles().stream()
-            .filter(statisticsFile -> statisticsFile.snapshotId() == 
statisticsFile1.snapshotId())
-            .collect(Collectors.toList());
-    Assertions.assertThat(statsWithSnapshotId1)
-        .as(
-            "Statistics file entry in TableMetadata should be deleted for the 
snapshot %s",
-            statisticsFile1.snapshotId())
-        .isEmpty();
     Assertions.assertThat(table.statisticsFiles())
         .as(
-            "Statistics file entry in TableMetadata should be present for the 
snapshot %s",
+            "Statistics file entry in TableMetadata should be present only for 
the snapshot %s",
             statisticsFile2.snapshotId())
         .extracting(StatisticsFile::snapshotId)
         .containsExactly(statisticsFile2.snapshotId());
@@ -513,7 +503,58 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         .exists();
   }
 
-  private StatisticsFile writeStatsFile(
+  @Test
+  public void testExpireSnapshotsWithPartitionStatisticFiles() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    String partitionStatsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile1 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation1, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+    sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+    table.refresh();
+    String partitionStatsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile2 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation2, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+    waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
+            catalogName, currentTimestamp, tableIdent);
+    Assertions.assertThat(output.get(0)[5])
+        .as("should be 1 deleted partition statistics file")
+        .isEqualTo(1L);
+
+    table.refresh();
+    Assertions.assertThat(table.partitionStatisticsFiles())
+        .as(
+            "partition statistics file entry in TableMetadata should be 
present only for the snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .extracting(PartitionStatisticsFile::snapshotId)
+        .containsExactly(partitionStatisticsFile2.snapshotId());
+
+    Assertions.assertThat(new File(partitionStatsFileLocation1))
+        .as(
+            "partition statistics file should not exist for snapshot %s",
+            partitionStatisticsFile1.snapshotId())
+        .doesNotExist();
+
+    Assertions.assertThat(new File(partitionStatsFileLocation2))
+        .as(
+            "partition statistics file should exist for snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .exists();
+  }
+
+  private static StatisticsFile writeStatsFile(
       long snapshotId, long snapshotSequenceNumber, String statsLocation, 
FileIO fileIO)
       throws IOException {
     try (PuffinWriter puffinWriter = 
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -536,9 +577,4 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
               .collect(ImmutableList.toImmutableList()));
     }
   }
-
-  private String statsFileLocation(String tableLocation) {
-    String statsFileName = "stats-file-" + UUID.randomUUID();
-    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
-  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index be82880cb7..05eb7a6f80 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -541,6 +542,75 @@ public class TestRemoveOrphanFilesProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(statsLocation.exists()).as("stats file should be 
deleted").isFalse();
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws 
Exception {
+    sql(
+        "CREATE TABLE %s USING iceberg "
+            + "TBLPROPERTIES('format-version'='2') "
+            + "AS SELECT 10 int, 'abc' data",
+        tableName);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    String partitionStatsLocation = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsLocation, 
table.io());
+
+    commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+    // wait to ensure files are old enough
+    waitUntilAfter(System.currentTimeMillis());
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should exist")
+        .exists();
+
+    removePartitionStatsTxn(table, partitionStatisticsFile);
+
+    output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+    Assertions.assertThat(Iterables.getOnlyElement(output))
+        .as("Deleted files")
+        .containsExactly("file:" + partitionStatsLocation);
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should be deleted")
+        .doesNotExist();
+  }
+
+  private static void removePartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+        .commit();
+    transaction.commitTransaction();
+  }
+
+  private static void commitPartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .setPartitionStatistics(partitionStatisticsFile)
+        .commit();
+    transaction.commitTransaction();
+  }
+
   @Test
   public void testRemoveOrphanFilesProcedureWithPrefixMode()
       throws NoSuchTableException, ParseException, IOException {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 3c007c6214..c5b8083583 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.NotFoundException;
@@ -204,14 +202,8 @@ abstract class BaseSparkAction<ThisT> {
   }
 
   protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> 
snapshotIds) {
-    Predicate<StatisticsFile> predicate;
-    if (snapshotIds == null) {
-      predicate = statisticsFile -> true;
-    } else {
-      predicate = statisticsFile -> 
snapshotIds.contains(statisticsFile.snapshotId());
-    }
-
-    List<String> statisticsFiles = 
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+    List<String> statisticsFiles =
+        ReachableFileUtil.statisticsFilesLocationsForSnapshots(table, 
snapshotIds);
     return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
   }
 
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+  private ProcedureUtil() {}
+
+  static PartitionStatisticsFile writePartitionStatsFile(
+      long snapshotId, String statsLocation, FileIO fileIO) {
+    PositionOutputStream positionOutputStream;
+    try {
+      positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+      positionOutputStream.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return ImmutableGenericPartitionStatisticsFile.builder()
+        .snapshotId(snapshotId)
+        .fileSizeInBytes(42L)
+        .path(statsLocation)
+        .build();
+  }
+
+  static String statsFileLocation(String tableLocation) {
+    String statsFileName = "stats-file-" + UUID.randomUUID();
+    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
+  }
+}
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index c4b93c6d6a..7dacce5487 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,13 +28,12 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -445,7 +444,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
     sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
-    String statsFileLocation1 = statsFileLocation(table.location());
+    String statsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile1 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -456,7 +455,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
 
     sql("INSERT INTO %s SELECT 20, 'def'", tableName);
     table.refresh();
-    String statsFileLocation2 = statsFileLocation(table.location());
+    String statsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile2 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -475,18 +474,9 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics 
file").isEqualTo(1L);
 
     table.refresh();
-    List<StatisticsFile> statsWithSnapshotId1 =
-        table.statisticsFiles().stream()
-            .filter(statisticsFile -> statisticsFile.snapshotId() == 
statisticsFile1.snapshotId())
-            .collect(Collectors.toList());
-    Assertions.assertThat(statsWithSnapshotId1)
-        .as(
-            "Statistics file entry in TableMetadata should be deleted for the 
snapshot %s",
-            statisticsFile1.snapshotId())
-        .isEmpty();
     Assertions.assertThat(table.statisticsFiles())
         .as(
-            "Statistics file entry in TableMetadata should be present for the 
snapshot %s",
+            "Statistics file entry in TableMetadata should be present only for 
the snapshot %s",
             statisticsFile2.snapshotId())
         .extracting(StatisticsFile::snapshotId)
         .containsExactly(statisticsFile2.snapshotId());
@@ -500,7 +490,58 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         .exists();
   }
 
-  private StatisticsFile writeStatsFile(
+  @Test
+  public void testExpireSnapshotsWithPartitionStatisticFiles() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    String partitionStatsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile1 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation1, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+    sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+    table.refresh();
+    String partitionStatsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile2 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation2, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+    waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
+            catalogName, currentTimestamp, tableIdent);
+    Assertions.assertThat(output.get(0)[5])
+        .as("should be 1 deleted partition statistics file")
+        .isEqualTo(1L);
+
+    table.refresh();
+    Assertions.assertThat(table.partitionStatisticsFiles())
+        .as(
+            "partition statistics file entry in TableMetadata should be 
present only for the snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .extracting(PartitionStatisticsFile::snapshotId)
+        .containsExactly(partitionStatisticsFile2.snapshotId());
+
+    Assertions.assertThat(new File(partitionStatsFileLocation1))
+        .as(
+            "partition statistics file should not exist for snapshot %s",
+            partitionStatisticsFile1.snapshotId())
+        .doesNotExist();
+
+    Assertions.assertThat(new File(partitionStatsFileLocation2))
+        .as(
+            "partition statistics file should exist for snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .exists();
+  }
+
+  private static StatisticsFile writeStatsFile(
       long snapshotId, long snapshotSequenceNumber, String statsLocation, 
FileIO fileIO)
       throws IOException {
     try (PuffinWriter puffinWriter = 
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -523,9 +564,4 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
               .collect(ImmutableList.toImmutableList()));
     }
   }
-
-  private String statsFileLocation(String tableLocation) {
-    String statsFileName = "stats-file-" + UUID.randomUUID();
-    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
-  }
 }
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index b299817750..40adf30c37 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -530,6 +531,75 @@ public class TestRemoveOrphanFilesProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(statsLocation.exists()).as("stats file should be 
deleted").isFalse();
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws 
Exception {
+    sql(
+        "CREATE TABLE %s USING iceberg "
+            + "TBLPROPERTIES('format-version'='2') "
+            + "AS SELECT 10 int, 'abc' data",
+        tableName);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    String partitionStatsLocation = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsLocation, 
table.io());
+
+    commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+    // wait to ensure files are old enough
+    waitUntilAfter(System.currentTimeMillis());
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should exist")
+        .exists();
+
+    removePartitionStatsTxn(table, partitionStatisticsFile);
+
+    output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+    Assertions.assertThat(Iterables.getOnlyElement(output))
+        .as("Deleted files")
+        .containsExactly("file:" + partitionStatsLocation);
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should be deleted")
+        .doesNotExist();
+  }
+
+  private static void removePartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+        .commit();
+    transaction.commitTransaction();
+  }
+
+  private static void commitPartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .setPartitionStatistics(partitionStatisticsFile)
+        .commit();
+    transaction.commitTransaction();
+  }
+
   @Test
   public void testRemoveOrphanFilesProcedureWithPrefixMode()
       throws NoSuchTableException, ParseException, IOException {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 62f5167526..cff07c05d4 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.NotFoundException;
@@ -197,14 +195,8 @@ abstract class BaseSparkAction<ThisT> {
   }
 
   protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> 
snapshotIds) {
-    Predicate<StatisticsFile> predicate;
-    if (snapshotIds == null) {
-      predicate = statisticsFile -> true;
-    } else {
-      predicate = statisticsFile -> 
snapshotIds.contains(statisticsFile.snapshotId());
-    }
-
-    List<String> statisticsFiles = 
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+    List<String> statisticsFiles =
+        ReachableFileUtil.statisticsFilesLocationsForSnapshots(table, 
snapshotIds);
     return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
   }
 
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+  private ProcedureUtil() {}
+
+  static PartitionStatisticsFile writePartitionStatsFile(
+      long snapshotId, String statsLocation, FileIO fileIO) {
+    PositionOutputStream positionOutputStream;
+    try {
+      positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+      positionOutputStream.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return ImmutableGenericPartitionStatisticsFile.builder()
+        .snapshotId(snapshotId)
+        .fileSizeInBytes(42L)
+        .path(statsLocation)
+        .build();
+  }
+
+  static String statsFileLocation(String tableLocation) {
+    String statsFileName = "stats-file-" + UUID.randomUUID();
+    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
+  }
+}
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index c4b93c6d6a..7dacce5487 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,13 +28,12 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -445,7 +444,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
     sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
     Table table = validationCatalog.loadTable(tableIdent);
-    String statsFileLocation1 = statsFileLocation(table.location());
+    String statsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile1 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -456,7 +455,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
 
     sql("INSERT INTO %s SELECT 20, 'def'", tableName);
     table.refresh();
-    String statsFileLocation2 = statsFileLocation(table.location());
+    String statsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
     StatisticsFile statisticsFile2 =
         writeStatsFile(
             table.currentSnapshot().snapshotId(),
@@ -475,18 +474,9 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics 
file").isEqualTo(1L);
 
     table.refresh();
-    List<StatisticsFile> statsWithSnapshotId1 =
-        table.statisticsFiles().stream()
-            .filter(statisticsFile -> statisticsFile.snapshotId() == 
statisticsFile1.snapshotId())
-            .collect(Collectors.toList());
-    Assertions.assertThat(statsWithSnapshotId1)
-        .as(
-            "Statistics file entry in TableMetadata should be deleted for the 
snapshot %s",
-            statisticsFile1.snapshotId())
-        .isEmpty();
     Assertions.assertThat(table.statisticsFiles())
         .as(
-            "Statistics file entry in TableMetadata should be present for the 
snapshot %s",
+            "Statistics file entry in TableMetadata should be present only for 
the snapshot %s",
             statisticsFile2.snapshotId())
         .extracting(StatisticsFile::snapshotId)
         .containsExactly(statisticsFile2.snapshotId());
@@ -500,7 +490,58 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         .exists();
   }
 
-  private StatisticsFile writeStatsFile(
+  @Test
+  public void testExpireSnapshotsWithPartitionStatisticFiles() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    String partitionStatsFileLocation1 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile1 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation1, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+    sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+    table.refresh();
+    String partitionStatsFileLocation2 = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile2 =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsFileLocation2, 
table.io());
+    
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+    waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
+            catalogName, currentTimestamp, tableIdent);
+    Assertions.assertThat(output.get(0)[5])
+        .as("should be 1 deleted partition statistics file")
+        .isEqualTo(1L);
+
+    table.refresh();
+    Assertions.assertThat(table.partitionStatisticsFiles())
+        .as(
+            "partition statistics file entry in TableMetadata should be 
present only for the snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .extracting(PartitionStatisticsFile::snapshotId)
+        .containsExactly(partitionStatisticsFile2.snapshotId());
+
+    Assertions.assertThat(new File(partitionStatsFileLocation1))
+        .as(
+            "partition statistics file should not exist for snapshot %s",
+            partitionStatisticsFile1.snapshotId())
+        .doesNotExist();
+
+    Assertions.assertThat(new File(partitionStatsFileLocation2))
+        .as(
+            "partition statistics file should exist for snapshot %s",
+            partitionStatisticsFile2.snapshotId())
+        .exists();
+  }
+
+  private static StatisticsFile writeStatsFile(
       long snapshotId, long snapshotSequenceNumber, String statsLocation, 
FileIO fileIO)
       throws IOException {
     try (PuffinWriter puffinWriter = 
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -523,9 +564,4 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
               .collect(ImmutableList.toImmutableList()));
     }
   }
-
-  private String statsFileLocation(String tableLocation) {
-    String statsFileName = "stats-file-" + UUID.randomUUID();
-    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
-  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index b299817750..40adf30c37 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
@@ -530,6 +531,75 @@ public class TestRemoveOrphanFilesProcedure extends 
SparkExtensionsTestBase {
     Assertions.assertThat(statsLocation.exists()).as("stats file should be 
deleted").isFalse();
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws 
Exception {
+    sql(
+        "CREATE TABLE %s USING iceberg "
+            + "TBLPROPERTIES('format-version'='2') "
+            + "AS SELECT 10 int, 'abc' data",
+        tableName);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    String partitionStatsLocation = 
ProcedureUtil.statsFileLocation(table.location());
+    PartitionStatisticsFile partitionStatisticsFile =
+        ProcedureUtil.writePartitionStatsFile(
+            table.currentSnapshot().snapshotId(), partitionStatsLocation, 
table.io());
+
+    commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+    // wait to ensure files are old enough
+    waitUntilAfter(System.currentTimeMillis());
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should exist")
+        .exists();
+
+    removePartitionStatsTxn(table, partitionStatisticsFile);
+
+    output =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "older_than => TIMESTAMP '%s')",
+            catalogName, tableIdent, currentTimestamp);
+    Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+    Assertions.assertThat(Iterables.getOnlyElement(output))
+        .as("Deleted files")
+        .containsExactly("file:" + partitionStatsLocation);
+    Assertions.assertThat(new File(partitionStatsLocation))
+        .as("partition stats file should be deleted")
+        .doesNotExist();
+  }
+
+  private static void removePartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+        .commit();
+    transaction.commitTransaction();
+  }
+
+  private static void commitPartitionStatsTxn(
+      Table table, PartitionStatisticsFile partitionStatisticsFile) {
+    Transaction transaction = table.newTransaction();
+    transaction
+        .updatePartitionStatistics()
+        .setPartitionStatistics(partitionStatisticsFile)
+        .commit();
+    transaction.commitTransaction();
+  }
+
   @Test
   public void testRemoveOrphanFilesProcedureWithPrefixMode()
       throws NoSuchTableException, ParseException, IOException {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index d0e71a707d..53ce7418f3 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.NotFoundException;
@@ -196,14 +194,8 @@ abstract class BaseSparkAction<ThisT> {
   }
 
   protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> 
snapshotIds) {
-    Predicate<StatisticsFile> predicate;
-    if (snapshotIds == null) {
-      predicate = statisticsFile -> true;
-    } else {
-      predicate = statisticsFile -> 
snapshotIds.contains(statisticsFile.snapshotId());
-    }
-
-    List<String> statisticsFiles = 
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+    List<String> statisticsFiles =
+        ReachableFileUtil.statisticsFilesLocationsForSnapshots(table, 
snapshotIds);
     return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
   }
 


Reply via email to