This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new de644158a2 Core: Handle partition evolution case in
PartitionStatsUtil#computeStats (#12137)
de644158a2 is described below
commit de644158a20c4a8d1b79fbc7773ae5a58dd316ac
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Feb 20 12:44:02 2025 +0100
Core: Handle partition evolution case in PartitionStatsUtil#computeStats
(#12137)
---
.../org/apache/iceberg/PartitionStatsUtil.java | 5 +-
.../org/apache/iceberg/TestPartitionStatsUtil.java | 169 ++++++++++++++++++++-
2 files changed, 166 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
index 1fe4e6767f..4cf1902937 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
@@ -96,7 +96,10 @@ public class PartitionStatsUtil {
StructLike key = keyTemplate.copyFor(coercedPartition);
Snapshot snapshot = table.snapshot(entry.snapshotId());
PartitionStats stats =
- statsMap.computeIfAbsent(specId, key, () -> new
PartitionStats(key, specId));
+ statsMap.computeIfAbsent(
+ specId,
+ ((PartitionData) file.partition()).copy(),
+ () -> new PartitionStats(key, specId));
if (entry.isLive()) {
stats.liveEntry(file, snapshot);
} else {
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
index 541fcd2ca2..bd4b4a2ff6 100644
--- a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.assertj.core.groups.Tuple;
@@ -370,16 +371,170 @@ public class TestPartitionStatsUtil {
snapshot2.snapshotId()));
}
- private static PartitionData partitionData(Types.StructType partitionType,
String c2, String c3) {
- PartitionData partitionData = new PartitionData(partitionType);
- partitionData.set(0, c2);
- partitionData.set(1, c3);
- return partitionData;
+ @Test
+ @SuppressWarnings("MethodLength")
+ public void testPartitionStatsWithBucketTransformSchemaEvolution() throws
Exception {
+ PartitionSpec specBefore =
+ PartitionSpec.builderFor(SCHEMA).identity("c2").bucket("c1",
2).build();
+
+ Table testTable =
+ TestTables.create(
+ tempDir("partition_stats_schema_evolve2"),
+ "partition_stats_schema_evolve2",
+ SCHEMA,
+ specBefore,
+ SortOrder.unsorted(),
+ 2);
+
+ List<DataFile> dataFiles = Lists.newArrayList();
+ for (int i = 0; i < 2; i++) {
+ dataFiles.add(FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", i)));
+ }
+
+ AppendFiles appendFiles = testTable.newAppend();
+ dataFiles.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+
+ Snapshot snapshot1 = testTable.currentSnapshot();
+ Types.StructType partitionType = Partitioning.partitionType(testTable);
+
+ computeAndValidatePartitionStats(
+ testTable,
+ Tuple.tuple(
+ partitionData(partitionType, "foo", 0),
+ 0,
+ dataFiles.get(0).recordCount(),
+ 1,
+ dataFiles.get(0).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "foo", 1),
+ 0,
+ dataFiles.get(1).recordCount(),
+ 1,
+ dataFiles.get(1).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId()));
+
+ // Evolve the partition spec
+ testTable
+ .updateSpec()
+ .removeField(Expressions.bucket("c1", 2))
+ .addField(Expressions.bucket("c1", 4))
+ .commit();
+
+ List<DataFile> filesWithNewSpec = Lists.newArrayList();
+ for (int i = 0; i < 4; i++) {
+ filesWithNewSpec.add(
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("bar", i)));
+ }
+
+ appendFiles = testTable.newAppend();
+ filesWithNewSpec.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+
+ Snapshot snapshot2 = testTable.currentSnapshot();
+ partitionType = Partitioning.partitionType(testTable);
+
+ computeAndValidatePartitionStats(
+ testTable,
+ Tuple.tuple(
+ partitionData(partitionType, "foo", 0, null),
+ 0,
+ dataFiles.get(0).recordCount(),
+ 1,
+ dataFiles.get(0).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "foo", 1, null),
+ 0,
+ dataFiles.get(1).recordCount(),
+ 1,
+ dataFiles.get(1).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "bar", null, 0),
+ 1,
+ filesWithNewSpec.get(0).recordCount(),
+ 1,
+ filesWithNewSpec.get(0).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot2.timestampMillis(),
+ snapshot2.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "bar", null, 1),
+ 1,
+ filesWithNewSpec.get(1).recordCount(),
+ 1,
+ filesWithNewSpec.get(1).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot2.timestampMillis(),
+ snapshot2.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "bar", null, 2),
+ 1,
+ filesWithNewSpec.get(2).recordCount(),
+ 1,
+ filesWithNewSpec.get(2).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot2.timestampMillis(),
+ snapshot2.snapshotId()),
+ Tuple.tuple(
+ partitionData(partitionType, "bar", null, 3),
+ 1,
+ filesWithNewSpec.get(3).recordCount(),
+ 1,
+ filesWithNewSpec.get(3).fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ snapshot2.timestampMillis(),
+ snapshot2.snapshotId()));
}
- private static PartitionData partitionData(Types.StructType partitionType,
String c2) {
+ private static PartitionData partitionData(Types.StructType partitionType,
Object... fields) {
PartitionData partitionData = new PartitionData(partitionType);
- partitionData.set(0, c2);
+ for (int i = 0; i < fields.length; i++) {
+ partitionData.set(i, fields[i]);
+ }
+
return partitionData;
}