This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5996b3de51 Core: Fix skipped file counts in ManifestReader with
deleted entries (#8432)
5996b3de51 is described below
commit 5996b3de518b27cd356aa020456dcedb169c5b00
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Sep 1 16:31:59 2023 -0700
Core: Fix skipped file counts in ManifestReader with deleted entries (#8432)
---
.../java/org/apache/iceberg/ManifestReader.java | 40 ++++++++++++++++------
.../iceberg/TestScanPlanningAndReporting.java | 32 +++++++++++------
2 files changed, 50 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 4ccf9451d1..4ee51aa60c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -203,9 +203,11 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
CloseableIterable<ManifestEntry<F>> entries() {
- if ((rowFilter != null && rowFilter != Expressions.alwaysTrue())
- || (partFilter != null && partFilter != Expressions.alwaysTrue())
- || (partitionSet != null)) {
+ return entries(false /* all entries */);
+ }
+
+ private CloseableIterable<ManifestEntry<F>> entries(boolean onlyLive) {
+ if (hasRowFilter() || hasPartitionFilter() || partitionSet != null) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
@@ -213,22 +215,34 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
boolean requireStatsProjection = requireStatsProjection(rowFilter,
columns);
Collection<String> projectColumns =
requireStatsProjection ? withStatsColumns(columns) : columns;
+ CloseableIterable<ManifestEntry<F>> entries =
+ open(projection(fileSchema, fileProjection, projectColumns,
caseSensitive));
return CloseableIterable.filter(
content == FileType.DATA_FILES
? scanMetrics.skippedDataFiles()
: scanMetrics.skippedDeleteFiles(),
- open(projection(fileSchema, fileProjection, projectColumns,
caseSensitive)),
+ onlyLive ? filterLiveEntries(entries) : entries,
entry ->
entry != null
&& evaluator.eval(entry.file().partition())
&& metricsEvaluator.eval(entry.file())
&& inPartitionSet(entry.file()));
} else {
- return open(projection(fileSchema, fileProjection, columns,
caseSensitive));
+ CloseableIterable<ManifestEntry<F>> entries =
+ open(projection(fileSchema, fileProjection, columns, caseSensitive));
+ return onlyLive ? filterLiveEntries(entries) : entries;
}
}
+ private boolean hasRowFilter() {
+ return rowFilter != null && rowFilter != Expressions.alwaysTrue();
+ }
+
+ private boolean hasPartitionFilter() {
+ return partFilter != null && partFilter != Expressions.alwaysTrue();
+ }
+
private boolean inPartitionSet(F fileToCheck) {
return partitionSet == null
|| partitionSet.contains(fileToCheck.specId(),
fileToCheck.partition());
@@ -266,12 +280,16 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
CloseableIterable<ManifestEntry<F>> liveEntries() {
- return CloseableIterable.filter(
- content == FileType.DATA_FILES
- ? scanMetrics.skippedDataFiles()
- : scanMetrics.skippedDeleteFiles(),
- entries(),
- entry -> entry != null && entry.status() !=
ManifestEntry.Status.DELETED);
+ return entries(true /* only live entries */);
+ }
+
+ private CloseableIterable<ManifestEntry<F>> filterLiveEntries(
+ CloseableIterable<ManifestEntry<F>> entries) {
+ return CloseableIterable.filter(entries, this::isLiveEntry);
+ }
+
+ private boolean isLiveEntry(ManifestEntry<F> entry) {
+ return entry != null && entry.status() != ManifestEntry.Status.DELETED;
}
/** @return an Iterator of DataFile. Makes defensive copies of files before
returning */
diff --git
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index 8dbdd9cf6b..106c236f59 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -215,19 +215,24 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
- table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
- table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
+ table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
+ table.newAppend().appendFile(FILE_C).commit();
TableScan tableScan = table.newScan();
+ List<FileScanTask> fileTasks = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
- fileScanTasks.forEach(task -> {});
+ fileScanTasks.forEach(fileTasks::add);
}
+ assertThat(fileTasks)
+ .singleElement()
+ .satisfies(task ->
assertThat(task.file().path()).isEqualTo(FILE_D.path()));
ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
- assertThat(scanReport.snapshotId()).isEqualTo(2L);
+ assertThat(scanReport.snapshotId()).isEqualTo(3L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.skippedDataFiles().value()).isEqualTo(1);
assertThat(result.skippedDeleteFiles().value()).isEqualTo(0);
@@ -236,9 +241,9 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.scannedDeleteManifests().value()).isEqualTo(0);
- assertThat(result.skippedDataManifests().value()).isEqualTo(1);
+ assertThat(result.skippedDataManifests().value()).isEqualTo(2);
assertThat(result.skippedDeleteManifests().value()).isEqualTo(0);
- assertThat(result.totalDataManifests().value()).isEqualTo(2);
+ assertThat(result.totalDataManifests().value()).isEqualTo(3);
assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
@@ -250,20 +255,25 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
- table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
+ table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
TableScan tableScan = table.newScan();
+ List<FileScanTask> fileTasks = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
- fileScanTasks.forEach(task -> {});
+ fileScanTasks.forEach(fileTasks::add);
}
+ assertThat(fileTasks)
+ .singleElement()
+ .satisfies(task ->
assertThat(task.file().path()).isEqualTo(FILE_D.path()));
ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
- assertThat(scanReport.snapshotId()).isEqualTo(3L);
+ assertThat(scanReport.snapshotId()).isEqualTo(4L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);
@@ -272,9 +282,9 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
assertThat(result.skippedDeleteFiles().value()).isEqualTo(1);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.scannedDeleteManifests().value()).isEqualTo(1);
- assertThat(result.skippedDataManifests().value()).isEqualTo(0);
+ assertThat(result.skippedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDeleteManifests().value()).isEqualTo(1);
- assertThat(result.totalDataManifests().value()).isEqualTo(1);
+ assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDeleteManifests().value()).isEqualTo(2);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(10L);