This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 51d548a4fc API, Core: Scan API for partition stats (#14640)
51d548a4fc is described below

commit 51d548a4fc101df3494e17a5f7ff5c615c5979cc
Author: gaborkaszab <[email protected]>
AuthorDate: Wed Jan 7 13:58:23 2026 +0100

    API, Core: Scan API for partition stats (#14640)
---
 .../org/apache/iceberg/PartitionStatistics.java    |  65 +++
 .../apache/iceberg/PartitionStatisticsScan.java    |  59 +++
 api/src/main/java/org/apache/iceberg/Table.java    |  12 +
 .../apache/iceberg/BasePartitionStatistics.java    | 201 +++++++++
 .../iceberg/BasePartitionStatisticsScan.java       |  86 ++++
 .../main/java/org/apache/iceberg/BaseTable.java    |   5 +
 .../java/org/apache/iceberg/PartitionStats.java    |   6 +
 .../org/apache/iceberg/PartitionStatsHandler.java  |   2 +
 .../iceberg/PartitionStatisticsScanTestBase.java   | 480 +++++++++++++++++++++
 .../iceberg/PartitionStatisticsTestBase.java       | 114 +++++
 .../iceberg/PartitionStatsHandlerTestBase.java     | 112 +----
 .../avro/TestAvroPartitionStatisticsScan.java      |  29 ++
 .../orc/TestOrcPartitionStatisticsScan.java        |  59 +++
 .../TestParquetPartitionStatisticsScan.java        |  30 ++
 14 files changed, 1164 insertions(+), 96 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java 
b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
new file mode 100644
index 0000000000..10df7303d5
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface for partition statistics returned from a {@link 
PartitionStatisticsScan}. */
+public interface PartitionStatistics extends StructLike {
+
+  /** Returns the partition of these partition statistics */
+  StructLike partition();
+
+  /** Returns the spec ID of the partition of these partition statistics */
+  Integer specId();
+
+  /** Returns the number of data records in the partition */
+  Long dataRecordCount();
+
+  /** Returns the number of data files in the partition */
+  Integer dataFileCount();
+
+  /** Returns the total size of data files in bytes in the partition */
+  Long totalDataFileSizeInBytes();
+
+  /**
+   * Returns the number of positional delete records in the partition. Also 
includes dv record count
+   * as per spec
+   */
+  Long positionDeleteRecordCount();
+
+  /** Returns the number of positional delete files in the partition */
+  Integer positionDeleteFileCount();
+
+  /** Returns the number of equality delete records in the partition */
+  Long equalityDeleteRecordCount();
+
+  /** Returns the number of equality delete files in the partition */
+  Integer equalityDeleteFileCount();
+
+  /** Returns the total number of records in the partition */
+  Long totalRecords();
+
+  /** Returns the timestamp in milliseconds when the partition was last 
updated */
+  Long lastUpdatedAt();
+
+  /** Returns the ID of the snapshot that last updated this partition */
+  Long lastUpdatedSnapshotId();
+
+  /** Returns the number of delete vectors in the partition */
+  Integer dvCount();
+}
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java 
b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
new file mode 100644
index 0000000000..18d8b20318
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+/** API for configuring partition statistics scan. */
+public interface PartitionStatisticsScan {
+
+  /**
+   * Create a new scan from this scan's configuration that will use the given 
snapshot by ID.
+   *
+   * @param snapshotId a snapshot ID
+   * @return a new scan based on this with the given snapshot ID
+   * @throws IllegalArgumentException if the snapshot cannot be found
+   */
+  PartitionStatisticsScan useSnapshot(long snapshotId);
+
+  /**
+   * Create a new scan from the results of this, where partitions are filtered 
by the {@link
+   * Expression}.
+   *
+   * @param filter a filter expression
+   * @return a new scan based on this with results filtered by the expression
+   */
+  PartitionStatisticsScan filter(Expression filter);
+
+  /**
+   * Create a new scan from this with the schema as its projection.
+   *
+   * @param schema a projection schema
+   * @return a new scan based on this with the given projection
+   */
+  PartitionStatisticsScan project(Schema schema);
+
+  /**
+   * Scans a partition statistics file belonging to a particular snapshot
+   *
+   * @return an Iterable of partition statistics
+   */
+  CloseableIterable<PartitionStatistics> scan();
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java 
b/api/src/main/java/org/apache/iceberg/Table.java
index 97ea9ba765..3c0689e892 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -83,6 +83,18 @@ public interface Table {
     throw new UnsupportedOperationException("Incremental changelog scan is not 
supported");
   }
 
+  /**
+   * Create a new {@link PartitionStatisticsScan} for this table.
+   *
+   * <p>Once a partition statistics scan is created, it can be refined to 
project columns and filter
+   * data.
+   *
+   * @return a partition statistics scan for this table
+   */
+  default PartitionStatisticsScan newPartitionStatisticsScan() {
+    throw new UnsupportedOperationException("Partition statistics scan is not 
supported");
+  }
+
   /**
    * Return the {@link Schema schema} for this table.
    *
diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java 
b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
new file mode 100644
index 0000000000..c17718281b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.avro.SupportsIndexProjection;
+import org.apache.iceberg.types.Types;
+
+public class BasePartitionStatistics extends SupportsIndexProjection
+    implements PartitionStatistics {
+
+  private StructLike partition;
+  private Integer specId;
+  private Long dataRecordCount;
+  private Integer dataFileCount;
+  private Long totalDataFileSizeInBytes;
+  private Long positionDeleteRecordCount;
+  private Integer positionDeleteFileCount;
+  private Long equalityDeleteRecordCount;
+  private Integer equalityDeleteFileCount;
+  private Long totalRecordCount; // Not calculated, as it needs scanning the 
data. Remains null
+  private Long lastUpdatedAt;
+  private Long lastUpdatedSnapshotId;
+  private Integer dvCount;
+
+  private static final int STATS_COUNT = 13;
+
+  /** Used by internal readers to instantiate this class with a projection 
schema. */
+  public BasePartitionStatistics(Types.StructType projection) {
+    super(STATS_COUNT);
+  }
+
+  @Override
+  public StructLike partition() {
+    return partition;
+  }
+
+  @Override
+  public Integer specId() {
+    return specId;
+  }
+
+  @Override
+  public Long dataRecordCount() {
+    return dataRecordCount;
+  }
+
+  @Override
+  public Integer dataFileCount() {
+    return dataFileCount;
+  }
+
+  @Override
+  public Long totalDataFileSizeInBytes() {
+    return totalDataFileSizeInBytes;
+  }
+
+  @Override
+  public Long positionDeleteRecordCount() {
+    return positionDeleteRecordCount;
+  }
+
+  @Override
+  public Integer positionDeleteFileCount() {
+    return positionDeleteFileCount;
+  }
+
+  @Override
+  public Long equalityDeleteRecordCount() {
+    return equalityDeleteRecordCount;
+  }
+
+  @Override
+  public Integer equalityDeleteFileCount() {
+    return equalityDeleteFileCount;
+  }
+
+  @Override
+  public Long totalRecords() {
+    return totalRecordCount;
+  }
+
+  @Override
+  public Long lastUpdatedAt() {
+    return lastUpdatedAt;
+  }
+
+  @Override
+  public Long lastUpdatedSnapshotId() {
+    return lastUpdatedSnapshotId;
+  }
+
+  @Override
+  public Integer dvCount() {
+    return dvCount;
+  }
+
+  @Override
+  protected <T> T internalGet(int pos, Class<T> javaClass) {
+    return javaClass.cast(getByPos(pos));
+  }
+
+  private Object getByPos(int pos) {
+    switch (pos) {
+      case 0:
+        return partition;
+      case 1:
+        return specId;
+      case 2:
+        return dataRecordCount;
+      case 3:
+        return dataFileCount;
+      case 4:
+        return totalDataFileSizeInBytes;
+      case 5:
+        return positionDeleteRecordCount;
+      case 6:
+        return positionDeleteFileCount;
+      case 7:
+        return equalityDeleteRecordCount;
+      case 8:
+        return equalityDeleteFileCount;
+      case 9:
+        return totalRecordCount;
+      case 10:
+        return lastUpdatedAt;
+      case 11:
+        return lastUpdatedSnapshotId;
+      case 12:
+        return dvCount;
+      default:
+        throw new UnsupportedOperationException("Unknown position: " + pos);
+    }
+  }
+
+  @Override
+  protected <T> void internalSet(int pos, T value) {
+    if (value == null) {
+      return;
+    }
+
+    switch (pos) {
+      case 0:
+        this.partition = (StructLike) value;
+        break;
+      case 1:
+        this.specId = (int) value;
+        break;
+      case 2:
+        this.dataRecordCount = (long) value;
+        break;
+      case 3:
+        this.dataFileCount = (int) value;
+        break;
+      case 4:
+        this.totalDataFileSizeInBytes = (long) value;
+        break;
+      case 5:
+        this.positionDeleteRecordCount = (long) value;
+        break;
+      case 6:
+        this.positionDeleteFileCount = (int) value;
+        break;
+      case 7:
+        this.equalityDeleteRecordCount = (long) value;
+        break;
+      case 8:
+        this.equalityDeleteFileCount = (int) value;
+        break;
+      case 9:
+        this.totalRecordCount = (Long) value;
+        break;
+      case 10:
+        this.lastUpdatedAt = (Long) value;
+        break;
+      case 11:
+        this.lastUpdatedSnapshotId = (Long) value;
+        break;
+      case 12:
+        this.dvCount = (int) value;
+        break;
+      default:
+        throw new UnsupportedOperationException("Unknown position: " + pos);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java 
b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java
new file mode 100644
index 0000000000..075a1a85d3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Optional;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class BasePartitionStatisticsScan implements PartitionStatisticsScan {
+
+  private final Table table;
+  private Long snapshotId;
+
+  public BasePartitionStatisticsScan(Table table) {
+    this.table = table;
+  }
+
+  @Override
+  public PartitionStatisticsScan useSnapshot(long newSnapshotId) {
+    Preconditions.checkArgument(
+        table.snapshot(newSnapshotId) != null, "Cannot find snapshot with ID 
%s", newSnapshotId);
+
+    this.snapshotId = newSnapshotId;
+    return this;
+  }
+
+  @Override
+  public PartitionStatisticsScan filter(Expression newFilter) {
+    throw new UnsupportedOperationException("Filtering is not supported");
+  }
+
+  @Override
+  public PartitionStatisticsScan project(Schema newSchema) {
+    throw new UnsupportedOperationException("Projection is not supported");
+  }
+
+  @Override
+  public CloseableIterable<PartitionStatistics> scan() {
+    if (snapshotId == null) {
+      if (table.currentSnapshot() == null) {
+        return CloseableIterable.empty();
+      }
+
+      snapshotId = table.currentSnapshot().snapshotId();
+    }
+
+    Optional<PartitionStatisticsFile> statsFile =
+        table.partitionStatisticsFiles().stream()
+            .filter(f -> f.snapshotId() == snapshotId)
+            .findFirst();
+
+    if (statsFile.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Types.StructType partitionType = Partitioning.partitionType(table);
+    Schema schema = PartitionStatsHandler.schema(partitionType, 
TableUtil.formatVersion(table));
+
+    FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path());
+    Preconditions.checkNotNull(
+        fileFormat != null, "Unable to determine format of file: %s", 
statsFile.get().path());
+
+    return InternalData.read(fileFormat, 
table.io().newInputFile(statsFile.get().path()))
+        .project(schema)
+        .setRootType(BasePartitionStatistics.class)
+        .build();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 23299a962c..c489c3bfb5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -90,6 +90,11 @@ public class BaseTable implements Table, HasTableOperations, 
Serializable {
     return new BaseIncrementalChangelogScan(this);
   }
 
+  @Override
+  public PartitionStatisticsScan newPartitionStatisticsScan() {
+    return new BasePartitionStatisticsScan(this);
+  }
+
   @Override
   public Schema schema() {
     return ops.current().schema();
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java 
b/core/src/main/java/org/apache/iceberg/PartitionStats.java
index 9051c8535c..e8a4e18916 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStats.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java
@@ -20,6 +20,12 @@ package org.apache.iceberg;
 
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
+/**
+ * Class to hold partition statistics values.
+ *
+ * @deprecated will be removed in 1.12.0. Use {@link BasePartitionStatistics 
instead}
+ */
+@Deprecated
 public class PartitionStats implements StructLike {
 
   private static final int STATS_COUNT = 13;
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java 
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 4e7c1b104e..7259a1f068 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -275,7 +275,9 @@ public class PartitionStatsHandler {
    *
    * @param schema The {@link Schema} of the partition statistics file.
    * @param inputFile An {@link InputFile} pointing to the partition stats 
file.
+   * @deprecated will be removed in 1.12.0, use {@link 
PartitionStatisticsScan} instead
    */
+  @Deprecated
   public static CloseableIterable<PartitionStats> readPartitionStatsFile(
       Schema schema, InputFile inputFile) {
     Preconditions.checkArgument(schema != null, "Invalid schema: null");
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java
new file mode 100644
index 0000000000..89eb70959c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.groups.Tuple;
+import org.junit.jupiter.api.Test;
+
+public abstract class PartitionStatisticsScanTestBase extends 
PartitionStatisticsTestBase {
+
+  public abstract FileFormat format();
+
+  private final Map<String, String> fileFormatProperty =
+      ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name());
+
+  @Test
+  public void testEmptyTable() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_empty_table"), "scan_empty_table", SCHEMA, SPEC, 2, 
fileFormatProperty);
+
+    
assertThat(Lists.newArrayList(testTable.newPartitionStatisticsScan().scan())).isEmpty();
+  }
+
+  @Test
+  public void testInvalidSnapshotId() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_invalid_snapshot"),
+            "scan_invalid_snapshot",
+            SCHEMA,
+            SPEC,
+            2,
+            fileFormatProperty);
+
+    assertThatThrownBy(() -> 
testTable.newPartitionStatisticsScan().useSnapshot(1234L).scan())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find snapshot with ID 1234");
+  }
+
+  @Test
+  public void testNoStatsForSnapshot() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_no_stats"), "scan_no_stats", SCHEMA, SPEC, 2, 
fileFormatProperty);
+
+    DataFile dataFile =
+        DataFiles.builder(SPEC)
+            .withPath("some_path")
+            .withFileSizeInBytes(15)
+            .withFormat(format())
+            .withRecordCount(1)
+            .build();
+    testTable.newAppend().appendFile(dataFile).commit();
+    long snapshotId = testTable.currentSnapshot().snapshotId();
+
+    
assertThat(testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()).isEmpty();
+  }
+
+  @Test
+  public void testReadingStatsWithInvalidSchema() throws Exception {
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_with_old_schema"),
+            "scan_with_old_schema",
+            SCHEMA,
+            spec,
+            2,
+            fileFormatProperty);
+    Types.StructType partitionType = Partitioning.partitionType(testTable);
+    Schema oldSchema = invalidOldSchema(partitionType);
+
+    // Add a dummy file to the table to have a snapshot
+    DataFile dataFile =
+        DataFiles.builder(spec)
+            .withPath("some_path")
+            .withFileSizeInBytes(15)
+            .withFormat(FileFormat.PARQUET)
+            .withRecordCount(1)
+            .build();
+    testTable.newAppend().appendFile(dataFile).commit();
+    long snapshotId = testTable.currentSnapshot().snapshotId();
+
+    testTable
+        .updatePartitionStatistics()
+        .setPartitionStatistics(
+            PartitionStatsHandler.writePartitionStatsFile(
+                testTable,
+                snapshotId,
+                oldSchema,
+                Collections.singletonList(randomStats(partitionType))))
+        .commit();
+
+    try (CloseableIterable<PartitionStatistics> recordIterator =
+        testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) 
{
+
+      if (format() == FileFormat.PARQUET) {
+        assertThatThrownBy(() -> Lists.newArrayList(recordIterator))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining("Not a primitive type: struct");
+      } else if (format() == FileFormat.AVRO) {
+        assertThatThrownBy(() -> Lists.newArrayList(recordIterator))
+            .isInstanceOf(ClassCastException.class)
+            .hasMessageContaining("Integer cannot be cast to class 
org.apache.iceberg.StructLike");
+      }
+    }
+  }
+
+  @Test
+  public void testV2toV3SchemaEvolution() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_with_schema_evolution"),
+            "scan_with_schema_evolution",
+            SCHEMA,
+            SPEC,
+            2,
+            fileFormatProperty);
+
+    // write stats file using v2 schema
+    DataFile dataFile =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "A"));
+    testTable.newAppend().appendFile(dataFile).commit();
+
+    testTable
+        .updatePartitionStatistics()
+        .setPartitionStatistics(
+            PartitionStatsHandler.computeAndWriteStatsFile(
+                testTable, testTable.currentSnapshot().snapshotId()))
+        .commit();
+
+    Types.StructType partitionSchema = Partitioning.partitionType(testTable);
+
+    // read with v2 schema
+    List<PartitionStatistics> partitionStatsV2;
+    try (CloseableIterable<PartitionStatistics> recordIterator =
+        testTable.newPartitionStatisticsScan().scan()) {
+      partitionStatsV2 = Lists.newArrayList(recordIterator);
+    }
+
+    testTable.updateProperties().set(TableProperties.FORMAT_VERSION, 
"3").commit();
+
+    // read with v3 schema
+    List<PartitionStatistics> partitionStatsV3;
+    try (CloseableIterable<PartitionStatistics> recordIterator =
+        testTable.newPartitionStatisticsScan().scan()) {
+      partitionStatsV3 = Lists.newArrayList(recordIterator);
+    }
+
+    assertThat(partitionStatsV2).hasSameSizeAs(partitionStatsV3);
+    Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
+    for (int i = 0; i < partitionStatsV2.size(); i++) {
+      assertThat(isEqual(comparator, partitionStatsV2.get(i), 
partitionStatsV3.get(i))).isTrue();
+    }
+  }
+
+  @SuppressWarnings("checkstyle:MethodLength")
+  @Test
+  public void testScanPartitionStatsForCurrentSnapshot() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_partition_stats"),
+            "scan_partition_stats",
+            SCHEMA,
+            SPEC,
+            2,
+            fileFormatProperty);
+
+    DataFile dataFile1 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "A"));
+    DataFile dataFile2 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "B"));
+    DataFile dataFile3 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("bar", "A"));
+    DataFile dataFile4 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("bar", "B"));
+
+    for (int i = 0; i < 3; i++) {
+      // insert same set of seven records thrice to have a new manifest files
+      testTable
+          .newAppend()
+          .appendFile(dataFile1)
+          .appendFile(dataFile2)
+          .appendFile(dataFile3)
+          .appendFile(dataFile4)
+          .commit();
+    }
+
+    Snapshot snapshot1 = testTable.currentSnapshot();
+    Schema recordSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
+
+    Types.StructType partitionType =
+        recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
+    computeAndValidatePartitionStats(
+        testTable,
+        testTable.currentSnapshot().snapshotId(),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "A"),
+            0,
+            3 * dataFile1.recordCount(),
+            3,
+            3 * dataFile1.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            null),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "B"),
+            0,
+            3 * dataFile2.recordCount(),
+            3,
+            3 * dataFile2.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            null),
+        Tuple.tuple(
+            partitionRecord(partitionType, "bar", "A"),
+            0,
+            3 * dataFile3.recordCount(),
+            3,
+            3 * dataFile3.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            null),
+        Tuple.tuple(
+            partitionRecord(partitionType, "bar", "B"),
+            0,
+            3 * dataFile4.recordCount(),
+            3,
+            3 * dataFile4.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            null));
+
+    DeleteFile posDelete =
+        FileGenerationUtil.generatePositionDeleteFile(testTable, 
TestHelpers.Row.of("bar", "A"));
+    testTable.newRowDelta().addDeletes(posDelete).commit();
+    // snapshot2 is unused in the result as same partition was updated by 
snapshot4
+
+    DeleteFile eqDelete =
+        FileGenerationUtil.generateEqualityDeleteFile(testTable, 
TestHelpers.Row.of("foo", "A"));
+    testTable.newRowDelta().addDeletes(eqDelete).commit();
+    Snapshot snapshot3 = testTable.currentSnapshot();
+
+    testTable.updateProperties().set(TableProperties.FORMAT_VERSION, 
"3").commit();
+    DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile3);
+    testTable.newRowDelta().addDeletes(dv).commit();
+    Snapshot snapshot4 = testTable.currentSnapshot();
+
+    computeAndValidatePartitionStats(
+        testTable,
+        testTable.currentSnapshot().snapshotId(),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "A"),
+            0,
+            3 * dataFile1.recordCount(),
+            3,
+            3 * dataFile1.fileSizeInBytes(),
+            0L,
+            0,
+            eqDelete.recordCount(),
+            1,
+            null,
+            snapshot3.timestampMillis(),
+            snapshot3.snapshotId(),
+            0),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "B"),
+            0,
+            3 * dataFile2.recordCount(),
+            3,
+            3 * dataFile2.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            0),
+        Tuple.tuple(
+            partitionRecord(partitionType, "bar", "A"),
+            0,
+            3 * dataFile3.recordCount(),
+            3,
+            3 * dataFile3.fileSizeInBytes(),
+            posDelete.recordCount() + dv.recordCount(),
+            1,
+            0L,
+            0,
+            null,
+            snapshot4.timestampMillis(),
+            snapshot4.snapshotId(),
+            1), // dv count
+        Tuple.tuple(
+            partitionRecord(partitionType, "bar", "B"),
+            0,
+            3 * dataFile4.recordCount(),
+            3,
+            3 * dataFile4.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            snapshot1.timestampMillis(),
+            snapshot1.snapshotId(),
+            0));
+  }
+
+  @Test
+  public void testScanPartitionStatsForOlderSnapshot() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("scan_older_snapshot"),
+            "scan_older_snapshot",
+            SCHEMA,
+            SPEC,
+            2,
+            fileFormatProperty);
+
+    DataFile dataFile1 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "A"));
+    DataFile dataFile2 =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "B"));
+
+    testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    Snapshot firstSnapshot = testTable.currentSnapshot();
+
+    testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    Schema recordSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
+
+    Types.StructType partitionType =
+        recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
+
+    computeAndValidatePartitionStats(
+        testTable,
+        firstSnapshot.snapshotId(),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "A"),
+            0,
+            dataFile1.recordCount(),
+            1,
+            dataFile1.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            firstSnapshot.timestampMillis(),
+            firstSnapshot.snapshotId(),
+            null),
+        Tuple.tuple(
+            partitionRecord(partitionType, "foo", "B"),
+            0,
+            dataFile2.recordCount(),
+            1,
+            dataFile2.fileSizeInBytes(),
+            0L,
+            0,
+            0L,
+            0,
+            null,
+            firstSnapshot.timestampMillis(),
+            firstSnapshot.snapshotId(),
+            null));
+  }
+
+  private static void computeAndValidatePartitionStats(
+      Table testTable, long snapshotId, Tuple... expectedValues) throws 
IOException {
+    PartitionStatisticsFile result =
+        PartitionStatsHandler.computeAndWriteStatsFile(testTable, snapshotId);
+    
testTable.updatePartitionStatistics().setPartitionStatistics(result).commit();
+    assertThat(result.snapshotId()).isEqualTo(snapshotId);
+
+    PartitionStatisticsScan statScan = testTable.newPartitionStatisticsScan();
+    if (testTable.currentSnapshot().snapshotId() != snapshotId) {
+      statScan.useSnapshot(snapshotId);
+    }
+
+    List<PartitionStatistics> partitionStats;
+    try (CloseableIterable<PartitionStatistics> recordIterator = 
statScan.scan()) {
+      partitionStats = Lists.newArrayList(recordIterator);
+    }
+
+    assertThat(partitionStats)
+        .extracting(
+            PartitionStatistics::partition,
+            PartitionStatistics::specId,
+            PartitionStatistics::dataRecordCount,
+            PartitionStatistics::dataFileCount,
+            PartitionStatistics::totalDataFileSizeInBytes,
+            PartitionStatistics::positionDeleteRecordCount,
+            PartitionStatistics::positionDeleteFileCount,
+            PartitionStatistics::equalityDeleteRecordCount,
+            PartitionStatistics::equalityDeleteFileCount,
+            PartitionStatistics::totalRecords,
+            PartitionStatistics::lastUpdatedAt,
+            PartitionStatistics::lastUpdatedSnapshotId,
+            PartitionStatistics::dvCount)
+        .containsExactlyInAnyOrder(expectedValues);
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  protected static boolean isEqual(
+      Comparator<StructLike> partitionComparator,
+      PartitionStatistics stats1,
+      PartitionStatistics stats2) {
+    if (stats1 == stats2) {
+      return true;
+    } else if (stats1 == null || stats2 == null) {
+      return false;
+    }
+
+    return partitionComparator.compare(stats1.partition(), stats2.partition()) 
== 0
+        && Objects.equals(stats1.specId(), stats2.specId())
+        && Objects.equals(stats1.dataRecordCount(), stats2.dataRecordCount())
+        && Objects.equals(stats1.dataFileCount(), stats2.dataFileCount())
+        && Objects.equals(stats1.totalDataFileSizeInBytes(), 
stats2.totalDataFileSizeInBytes())
+        && Objects.equals(stats1.positionDeleteRecordCount(), 
stats2.positionDeleteRecordCount())
+        && Objects.equals(stats1.positionDeleteFileCount(), 
stats2.positionDeleteFileCount())
+        && Objects.equals(stats1.equalityDeleteRecordCount(), 
stats2.equalityDeleteRecordCount())
+        && Objects.equals(stats1.equalityDeleteFileCount(), 
stats2.equalityDeleteFileCount())
+        && Objects.equals(stats1.totalRecords(), stats2.totalRecords())
+        && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
+        && Objects.equals(stats1.lastUpdatedSnapshotId(), 
stats2.lastUpdatedSnapshotId());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
new file mode 100644
index 0000000000..72a5405d7f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT;
+import static 
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
+import static 
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT;
+import static 
org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
+import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME;
+import static 
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
+import static 
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID;
+import static 
org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
+import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class PartitionStatisticsTestBase {
+
+  @TempDir private File temp;
+
+  // positions in StructLike
+  protected static final int DATA_RECORD_COUNT_POSITION = 2;
+  protected static final int DATA_FILE_COUNT_POSITION = 3;
+  protected static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
+  protected static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
+  protected static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
+  protected static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
+  protected static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
+  protected static final int TOTAL_RECORD_COUNT_POSITION = 9;
+  protected static final int LAST_UPDATED_AT_POSITION = 10;
+  protected static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
+  protected static final int DV_COUNT_POSITION = 12;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.IntegerType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
+
+  private static final Random RANDOM = ThreadLocalRandom.current();
+
+  protected Schema invalidOldSchema(Types.StructType unifiedPartitionType) {
+    // field ids starts from 0 instead of 1
+    return new Schema(
+        Types.NestedField.required(0, PARTITION_FIELD_NAME, 
unifiedPartitionType),
+        Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()),
+        Types.NestedField.required(2, DATA_RECORD_COUNT.name(), 
Types.LongType.get()),
+        Types.NestedField.required(3, DATA_FILE_COUNT.name(), 
Types.IntegerType.get()),
+        Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), 
Types.LongType.get()),
+        Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(), 
Types.LongType.get()),
+        Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(), 
Types.IntegerType.get()),
+        Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(), 
Types.LongType.get()),
+        Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(), 
Types.IntegerType.get()),
+        Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(), 
Types.LongType.get()),
+        Types.NestedField.optional(10, LAST_UPDATED_AT.name(), 
Types.LongType.get()),
+        Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(), 
Types.LongType.get()));
+  }
+
+  protected PartitionStats randomStats(Types.StructType partitionType) {
+    PartitionData partitionData = new PartitionData(partitionType);
+    partitionData.set(0, RANDOM.nextInt());
+
+    return randomStats(partitionData);
+  }
+
+  protected PartitionStats randomStats(PartitionData partitionData) {
+    PartitionStats stats = new PartitionStats(partitionData, 
RANDOM.nextInt(10));
+    stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
+    stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
+    stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
+    return stats;
+  }
+
+  protected File tempDir(String folderName) throws IOException {
+    return java.nio.file.Files.createTempDirectory(temp.toPath(), 
folderName).toFile();
+  }
+
+  protected static StructLike partitionRecord(
+      Types.StructType partitionType, String val1, String val2) {
+    GenericRecord record = GenericRecord.create(partitionType);
+    record.set(0, val1);
+    record.set(1, val2);
+    return record;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index 71fdc9507d..9b93013a9b 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -18,25 +18,12 @@
  */
 package org.apache.iceberg;
 
-import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT;
-import static 
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
-import static 
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT;
-import static 
org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
 import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
-import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME;
-import static 
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
-import static 
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID;
-import static 
org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
-import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -46,10 +33,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -61,10 +45,9 @@ import org.assertj.core.groups.Tuple;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
 
 @ExtendWith(ParameterizedTestExtension.class)
-public abstract class PartitionStatsHandlerTestBase {
+public abstract class PartitionStatsHandlerTestBase extends 
PartitionStatisticsTestBase {
 
   public abstract FileFormat format();
 
@@ -75,35 +58,9 @@ public abstract class PartitionStatsHandlerTestBase {
 
   @Parameter protected int formatVersion;
 
-  private static final Schema SCHEMA =
-      new Schema(
-          optional(1, "c1", Types.IntegerType.get()),
-          optional(2, "c2", Types.StringType.get()),
-          optional(3, "c3", Types.StringType.get()));
-
-  protected static final PartitionSpec SPEC =
-      PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
-
-  @TempDir public File temp;
-
-  private static final Random RANDOM = ThreadLocalRandom.current();
-
   private final Map<String, String> fileFormatProperty =
       ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name());
 
-  // position in StructLike
-  private static final int DATA_RECORD_COUNT_POSITION = 2;
-  private static final int DATA_FILE_COUNT_POSITION = 3;
-  private static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
-  private static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
-  private static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
-  private static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
-  private static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
-  private static final int TOTAL_RECORD_COUNT_POSITION = 9;
-  private static final int LAST_UPDATED_AT_POSITION = 10;
-  private static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
-  private static final int DV_COUNT_POSITION = 12;
-
   @Test
   public void testPartitionStatsOnEmptyTable() throws Exception {
     Table testTable =
@@ -223,10 +180,7 @@ public abstract class PartitionStatsHandlerTestBase {
     partitionData.set(13, new BigDecimal("12345678901234567890.1234567890"));
     partitionData.set(14, 
Literal.of("10:10:10").to(Types.TimeType.get()).value());
 
-    PartitionStats partitionStats = new PartitionStats(partitionData, 
RANDOM.nextInt(10));
-    partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
-    partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
-    partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
+    PartitionStats partitionStats = randomStats(partitionData);
     List<PartitionStats> expected = Collections.singletonList(partitionStats);
     PartitionStatisticsFile statisticsFile =
         PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, 
dataSchema, expected);
@@ -262,14 +216,8 @@ public abstract class PartitionStatsHandlerTestBase {
 
     ImmutableList.Builder<PartitionStats> partitionListBuilder = 
ImmutableList.builder();
     for (int i = 0; i < 5; i++) {
-      PartitionData partitionData =
-          new 
PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
-      partitionData.set(0, RANDOM.nextInt());
-
-      PartitionStats stats = new PartitionStats(partitionData, 
RANDOM.nextInt(10));
-      stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
-      stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
-      stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
+      PartitionStats stats =
+          
randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
       stats.set(POSITION_DELETE_RECORD_COUNT_POSITION, null);
       stats.set(POSITION_DELETE_FILE_COUNT_POSITION, null);
       stats.set(EQUALITY_DELETE_RECORD_COUNT_POSITION, null);
@@ -315,8 +263,12 @@ public abstract class PartitionStatsHandlerTestBase {
     }
   }
 
+  /**
+   * @deprecated will be removed in 1.12.0
+   */
   @SuppressWarnings("checkstyle:MethodLength")
   @Test
+  @Deprecated
   public void testPartitionStats() throws Exception {
     Table testTable =
         TestTables.create(
@@ -611,7 +563,11 @@ public abstract class PartitionStatsHandlerTestBase {
     assertThat(PartitionStatsHandler.latestStatsFile(testTable, 
snapshotBranchBId)).isNull();
   }
 
+  /**
+   * @deprecated will be removed in 1.12.0
+   */
   @Test
+  @Deprecated
   public void testReadingStatsWithInvalidSchema() throws Exception {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
     Table testTable =
@@ -677,7 +633,11 @@ public abstract class PartitionStatsHandlerTestBase {
     assertThat(partitionStats.get(0).dataFileCount()).isEqualTo(2);
   }
 
+  /**
+   * @deprecated will be removed in 1.12.0
+   */
   @Test
+  @Deprecated
   public void testV2toV3SchemaEvolution() throws Exception {
     Table testTable =
         TestTables.create(
@@ -718,14 +678,6 @@ public abstract class PartitionStatsHandlerTestBase {
     }
   }
 
-  private static StructLike partitionRecord(
-      Types.StructType partitionType, String val1, String val2) {
-    GenericRecord record = GenericRecord.create(partitionType);
-    record.set(0, val1);
-    record.set(1, val2);
-    return record;
-  }
-
   private static void computeAndValidatePartitionStats(
       Table testTable, Schema recordSchema, Tuple... expectedValues) throws 
IOException {
     // compute and commit partition stats file
@@ -760,38 +712,6 @@ public abstract class PartitionStatsHandlerTestBase {
         .containsExactlyInAnyOrder(expectedValues);
   }
 
-  private File tempDir(String folderName) throws IOException {
-    return java.nio.file.Files.createTempDirectory(temp.toPath(), 
folderName).toFile();
-  }
-
-  private Schema invalidOldSchema(Types.StructType unifiedPartitionType) {
-    // field ids starts from 0 instead of 1
-    return new Schema(
-        Types.NestedField.required(0, PARTITION_FIELD_NAME, 
unifiedPartitionType),
-        Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()),
-        Types.NestedField.required(2, DATA_RECORD_COUNT.name(), 
Types.LongType.get()),
-        Types.NestedField.required(3, DATA_FILE_COUNT.name(), 
Types.IntegerType.get()),
-        Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), 
Types.LongType.get()),
-        Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(), 
Types.LongType.get()),
-        Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(), 
Types.IntegerType.get()),
-        Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(), 
Types.LongType.get()),
-        Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(), 
Types.IntegerType.get()),
-        Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(), 
Types.LongType.get()),
-        Types.NestedField.optional(10, LAST_UPDATED_AT.name(), 
Types.LongType.get()),
-        Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(), 
Types.LongType.get()));
-  }
-
-  private PartitionStats randomStats(Types.StructType partitionType) {
-    PartitionData partitionData = new PartitionData(partitionType);
-    partitionData.set(0, RANDOM.nextInt());
-
-    PartitionStats stats = new PartitionStats(partitionData, 
RANDOM.nextInt(10));
-    stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
-    stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
-    stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
-    return stats;
-  }
-
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   private static boolean isEqual(
       Comparator<StructLike> partitionComparator, PartitionStats stats1, 
PartitionStats stats2) {
diff --git 
a/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
new file mode 100644
index 0000000000..54e03180ce
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestAvroPartitionStatisticsScan extends 
PartitionStatisticsScanTestBase {
+
+  public FileFormat format() {
+    return FileFormat.AVRO;
+  }
+}
diff --git 
a/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java
new file mode 100644
index 0000000000..2040f046ee
--- /dev/null
+++ 
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestOrcPartitionStatisticsScan extends 
PartitionStatisticsScanTestBase {
+  @Override
+  public FileFormat format() {
+    return FileFormat.ORC;
+  }
+
+  @Override
+  public void testScanPartitionStatsForCurrentSnapshot() throws Exception {
+    assertThatThrownBy(super::testScanPartitionStatsForCurrentSnapshot)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
+
+  @Override
+  public void testScanPartitionStatsForOlderSnapshot() throws Exception {
+    assertThatThrownBy(super::testScanPartitionStatsForOlderSnapshot)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
+
+  @Override
+  public void testReadingStatsWithInvalidSchema() throws Exception {
+    assertThatThrownBy(super::testReadingStatsWithInvalidSchema)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
+
+  @Override
+  public void testV2toV3SchemaEvolution() throws Exception {
+    assertThatThrownBy(super::testV2toV3SchemaEvolution)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
+}
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
new file mode 100644
index 0000000000..5152e31b28
--- /dev/null
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestParquetPartitionStatisticsScan extends 
PartitionStatisticsScanTestBase {
+
+  @Override
+  public FileFormat format() {
+    return FileFormat.PARQUET;
+  }
+}

Reply via email to