This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new e27679106 Add cleanup support for partition-level statistics files
when `DROP TABLE PURGE` (#1508)
e27679106 is described below
commit e276791068130e5812ffb0ca539dc41cf0233fca
Author: danielhumanmod <[email protected]>
AuthorDate: Fri May 9 14:45:22 2025 -0700
Add cleanup support for partition-level statistics files when `DROP TABLE
PURGE` (#1508)
* cleaning up partition stats
* update partition stat file extension
* update test partition stat write impl
---
.../task/BatchFileCleanupTaskHandlerTest.java | 41 ++++++++++++++--------
.../quarkus/task/TableCleanupTaskHandlerTest.java | 22 ++++++++++--
.../service/quarkus/task/TaskTestUtils.java | 40 +++++++++++++++++++--
.../service/task/TableCleanupTaskHandler.java | 15 ++++----
4 files changed, 93 insertions(+), 25 deletions(-)
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
index b55ffbb9e..662f88bb0 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
@@ -124,10 +125,19 @@ public class BatchFileCleanupTaskHandlerTest {
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
+ PartitionStatisticsFile partitionStatisticsFile1 =
+ TaskTestUtils.writePartitionStatsFile(
+ snapshot.snapshotId(),
+ "/metadata/" + "partition-stats-" + UUID.randomUUID() +
".parquet",
+ fileIO);
String firstMetadataFile = "v1-295495059.metadata.json";
TableMetadata firstMetadata =
TaskTestUtils.writeTableMetadata(
- fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
+ fileIO,
+ firstMetadataFile,
+ List.of(statisticsFile1),
+ List.of(partitionStatisticsFile1),
+ snapshot);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
ManifestFile manifestFile3 =
@@ -148,6 +158,11 @@ public class BatchFileCleanupTaskHandlerTest {
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
+ PartitionStatisticsFile partitionStatisticsFile2 =
+ TaskTestUtils.writePartitionStatsFile(
+ snapshot2.snapshotId(),
+ "/metadata/" + "partition-stats-" + UUID.randomUUID() +
".parquet",
+ fileIO);
String secondMetadataFile = "v1-295495060.metadata.json";
TableMetadata secondMetadata =
TaskTestUtils.writeTableMetadata(
@@ -156,18 +171,19 @@ public class BatchFileCleanupTaskHandlerTest {
firstMetadata,
firstMetadataFile,
List.of(statisticsFile2),
+ List.of(partitionStatisticsFile2),
snapshot2);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
List<String> cleanupFiles =
- Stream.concat(
- secondMetadata.previousFiles().stream()
- .map(TableMetadata.MetadataLogEntry::file)
- .filter(file -> TaskUtils.exists(file, fileIO)),
- secondMetadata.statisticsFiles().stream()
- .map(StatisticsFile::path)
- .filter(file -> TaskUtils.exists(file, fileIO)))
+ Stream.of(
+
secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
+
secondMetadata.statisticsFiles().stream().map(StatisticsFile::path),
+ secondMetadata.partitionStatisticsFiles().stream()
+ .map(PartitionStatisticsFile::path))
+ .flatMap(s -> s)
+ .filter(file -> TaskUtils.exists(file, fileIO))
.toList();
TaskEntity task =
@@ -183,12 +199,9 @@ public class BatchFileCleanupTaskHandlerTest {
assertThatPredicate(handler::canHandleTask).accepts(task);
assertThat(handler.handleTask(task, callCtx)).isTrue();
- assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
- .rejects(firstMetadataFile);
- assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
- .rejects(statisticsFile1.path());
- assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
- .rejects(statisticsFile2.path());
+ for (String cleanupFile : cleanupFiles) {
+ assertThatPredicate((String file) -> TaskUtils.exists(file,
fileIO)).rejects(cleanupFile);
+ }
}
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
index 2bb53b40c..5e39028c9 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
@@ -519,10 +520,19 @@ class TableCleanupTaskHandlerTest {
snapshot.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
+ PartitionStatisticsFile partitionStatisticsFile1 =
+ TaskTestUtils.writePartitionStatsFile(
+ snapshot.snapshotId(),
+ "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
+ fileIO);
String firstMetadataFile = "v1-295495059.metadata.json";
TableMetadata firstMetadata =
TaskTestUtils.writeTableMetadata(
- fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
+ fileIO,
+ firstMetadataFile,
+ List.of(statisticsFile1),
+ List.of(partitionStatisticsFile1),
+ snapshot);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
ManifestFile manifestFile3 =
@@ -543,6 +553,11 @@ class TableCleanupTaskHandlerTest {
snapshot2.sequenceNumber(),
"/metadata/" + UUID.randomUUID() + ".stats",
fileIO);
+ PartitionStatisticsFile partitionStatisticsFile2 =
+ TaskTestUtils.writePartitionStatsFile(
+ snapshot2.snapshotId(),
+ "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
+ fileIO);
String secondMetadataFile = "v1-295495060.metadata.json";
TaskTestUtils.writeTableMetadata(
fileIO,
@@ -550,6 +565,7 @@ class TableCleanupTaskHandlerTest {
firstMetadata,
firstMetadataFile,
List.of(statisticsFile2),
+ List.of(partitionStatisticsFile2),
snapshot2);
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
@@ -609,7 +625,9 @@ class TableCleanupTaskHandlerTest {
snapshot.manifestListLocation(),
snapshot2.manifestListLocation(),
statisticsFile1.path(),
- statisticsFile2.path())),
+ statisticsFile2.path(),
+ partitionStatisticsFile1.path(),
+ partitionStatisticsFile2.path())),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
index 83f32f9b3..a4a00060c 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.quarkus.task;
import jakarta.annotation.Nonnull;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -30,10 +31,12 @@ import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
@@ -71,7 +74,7 @@ public class TaskTestUtils {
static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile,
Snapshot... snapshots)
throws IOException {
- return writeTableMetadata(fileIO, metadataFile, null, null, null,
snapshots);
+ return writeTableMetadata(fileIO, metadataFile, null, null, null, null,
snapshots);
}
static TableMetadata writeTableMetadata(
@@ -80,7 +83,18 @@ public class TaskTestUtils {
List<StatisticsFile> statisticsFiles,
Snapshot... snapshots)
throws IOException {
- return writeTableMetadata(fileIO, metadataFile, null, null,
statisticsFiles, snapshots);
+ return writeTableMetadata(fileIO, metadataFile, null, null,
statisticsFiles, null, snapshots);
+ }
+
+ static TableMetadata writeTableMetadata(
+ FileIO fileIO,
+ String metadataFile,
+ List<StatisticsFile> statisticsFiles,
+ List<PartitionStatisticsFile> partitionStatsFiles,
+ Snapshot... snapshots)
+ throws IOException {
+ return writeTableMetadata(
+ fileIO, metadataFile, null, null, statisticsFiles,
partitionStatsFiles, snapshots);
}
static TableMetadata writeTableMetadata(
@@ -89,6 +103,7 @@ public class TaskTestUtils {
TableMetadata prevMetadata,
String prevMetadataFile,
List<StatisticsFile> statisticsFiles,
+ List<PartitionStatisticsFile> partitionStatsFiles,
Snapshot... snapshots)
throws IOException {
TableMetadata.Builder tmBuilder;
@@ -106,11 +121,15 @@ public class TaskTestUtils {
.addPartitionSpec(PartitionSpec.unpartitioned());
int statisticsFileIndex = 0;
+ int partitionStatsFileIndex = 0;
for (Snapshot snapshot : snapshots) {
tmBuilder.addSnapshot(snapshot);
if (statisticsFiles != null) {
tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++));
}
+ if (partitionStatsFiles != null) {
+
tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++));
+ }
}
TableMetadata tableMetadata = tmBuilder.build();
PositionOutputStream out =
fileIO.newOutputFile(metadataFile).createOrOverwrite();
@@ -161,4 +180,21 @@ public class TaskTestUtils {
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
}
}
+
+ public static PartitionStatisticsFile writePartitionStatsFile(
+ long snapshotId, String statsLocation, FileIO fileIO) throws
UncheckedIOException {
+ PositionOutputStream positionOutputStream;
+ try {
+ positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+ positionOutputStream.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return ImmutableGenericPartitionStatisticsFile.builder()
+ .snapshotId(snapshotId)
+ .path(statsLocation)
+ .fileSizeInBytes(42L)
+ .build();
+ }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
index f9f1c2f35..ff791bf18 100644
---
a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
@@ -112,7 +113,6 @@ public class TableCleanupTaskHandler implements TaskHandler
{
metaStoreManager,
polarisCallContext);
- // TODO: handle partition statistics files
Stream<TaskEntity> metadataFileCleanupTasks =
getMetadataTaskStream(
cleanupTask,
@@ -243,12 +243,13 @@ public class TableCleanupTaskHandler implements
TaskHandler {
private List<List<String>> getMetadataFileBatches(TableMetadata
tableMetadata, int batchSize) {
List<List<String>> result = new ArrayList<>();
List<String> metadataFiles =
- Stream.concat(
- Stream.concat(
- tableMetadata.previousFiles().stream()
- .map(TableMetadata.MetadataLogEntry::file),
-
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
-
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
+ Stream.of(
+
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
+
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation),
+
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
+ tableMetadata.partitionStatisticsFiles().stream()
+ .map(PartitionStatisticsFile::path))
+ .flatMap(s -> s)
.toList();
for (int i = 0; i < metadataFiles.size(); i += batchSize) {