This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5996b3de51 Core: Fix skipped file counts in ManifestReader with 
deleted entries (#8432)
5996b3de51 is described below

commit 5996b3de518b27cd356aa020456dcedb169c5b00
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Sep 1 16:31:59 2023 -0700

    Core: Fix skipped file counts in ManifestReader with deleted entries (#8432)
---
 .../java/org/apache/iceberg/ManifestReader.java    | 40 ++++++++++++++++------
 .../iceberg/TestScanPlanningAndReporting.java      | 32 +++++++++++------
 2 files changed, 50 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 4ccf9451d1..4ee51aa60c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -203,9 +203,11 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
   }
 
   CloseableIterable<ManifestEntry<F>> entries() {
-    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue())
-        || (partFilter != null && partFilter != Expressions.alwaysTrue())
-        || (partitionSet != null)) {
+    return entries(false /* all entries */);
+  }
+
+  private CloseableIterable<ManifestEntry<F>> entries(boolean onlyLive) {
+    if (hasRowFilter() || hasPartitionFilter() || partitionSet != null) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
@@ -213,22 +215,34 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
       boolean requireStatsProjection = requireStatsProjection(rowFilter, 
columns);
       Collection<String> projectColumns =
           requireStatsProjection ? withStatsColumns(columns) : columns;
+      CloseableIterable<ManifestEntry<F>> entries =
+          open(projection(fileSchema, fileProjection, projectColumns, 
caseSensitive));
 
       return CloseableIterable.filter(
           content == FileType.DATA_FILES
               ? scanMetrics.skippedDataFiles()
               : scanMetrics.skippedDeleteFiles(),
-          open(projection(fileSchema, fileProjection, projectColumns, 
caseSensitive)),
+          onlyLive ? filterLiveEntries(entries) : entries,
           entry ->
               entry != null
                   && evaluator.eval(entry.file().partition())
                   && metricsEvaluator.eval(entry.file())
                   && inPartitionSet(entry.file()));
     } else {
-      return open(projection(fileSchema, fileProjection, columns, 
caseSensitive));
+      CloseableIterable<ManifestEntry<F>> entries =
+          open(projection(fileSchema, fileProjection, columns, caseSensitive));
+      return onlyLive ? filterLiveEntries(entries) : entries;
     }
   }
 
+  private boolean hasRowFilter() {
+    return rowFilter != null && rowFilter != Expressions.alwaysTrue();
+  }
+
+  private boolean hasPartitionFilter() {
+    return partFilter != null && partFilter != Expressions.alwaysTrue();
+  }
+
   private boolean inPartitionSet(F fileToCheck) {
     return partitionSet == null
         || partitionSet.contains(fileToCheck.specId(), 
fileToCheck.partition());
@@ -266,12 +280,16 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
   }
 
   CloseableIterable<ManifestEntry<F>> liveEntries() {
-    return CloseableIterable.filter(
-        content == FileType.DATA_FILES
-            ? scanMetrics.skippedDataFiles()
-            : scanMetrics.skippedDeleteFiles(),
-        entries(),
-        entry -> entry != null && entry.status() != 
ManifestEntry.Status.DELETED);
+    return entries(true /* only live entries */);
+  }
+
+  private CloseableIterable<ManifestEntry<F>> filterLiveEntries(
+      CloseableIterable<ManifestEntry<F>> entries) {
+    return CloseableIterable.filter(entries, this::isLiveEntry);
+  }
+
+  private boolean isLiveEntry(ManifestEntry<F> entry) {
+    return entry != null && entry.status() != ManifestEntry.Status.DELETED;
   }
 
   /** @return an Iterator of DataFile. Makes defensive copies of files before 
returning */
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java 
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index 8dbdd9cf6b..106c236f59 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -215,19 +215,24 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     Table table =
         TestTables.create(
             tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
-    table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
+    table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
+    table.newAppend().appendFile(FILE_C).commit();
     TableScan tableScan = table.newScan();
 
+    List<FileScanTask> fileTasks = Lists.newArrayList();
     try (CloseableIterable<FileScanTask> fileScanTasks =
         tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
-      fileScanTasks.forEach(task -> {});
+      fileScanTasks.forEach(fileTasks::add);
     }
+    assertThat(fileTasks)
+        .singleElement()
+        .satisfies(task -> 
assertThat(task.file().path()).isEqualTo(FILE_D.path()));
 
     ScanReport scanReport = reporter.lastReport();
     assertThat(scanReport).isNotNull();
     assertThat(scanReport.tableName()).isEqualTo(tableName);
-    assertThat(scanReport.snapshotId()).isEqualTo(2L);
+    assertThat(scanReport.snapshotId()).isEqualTo(3L);
     ScanMetricsResult result = scanReport.scanMetrics();
     assertThat(result.skippedDataFiles().value()).isEqualTo(1);
     assertThat(result.skippedDeleteFiles().value()).isEqualTo(0);
@@ -236,9 +241,9 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
     assertThat(result.scannedDataManifests().value()).isEqualTo(1);
     assertThat(result.scannedDeleteManifests().value()).isEqualTo(0);
-    assertThat(result.skippedDataManifests().value()).isEqualTo(1);
+    assertThat(result.skippedDataManifests().value()).isEqualTo(2);
     assertThat(result.skippedDeleteManifests().value()).isEqualTo(0);
-    assertThat(result.totalDataManifests().value()).isEqualTo(2);
+    assertThat(result.totalDataManifests().value()).isEqualTo(3);
     assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
     assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
     assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
@@ -250,20 +255,25 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     Table table =
         TestTables.create(
             tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
-    table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
+    table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
     
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
     
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
     TableScan tableScan = table.newScan();
 
+    List<FileScanTask> fileTasks = Lists.newArrayList();
     try (CloseableIterable<FileScanTask> fileScanTasks =
         tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
-      fileScanTasks.forEach(task -> {});
+      fileScanTasks.forEach(fileTasks::add);
     }
+    assertThat(fileTasks)
+        .singleElement()
+        .satisfies(task -> 
assertThat(task.file().path()).isEqualTo(FILE_D.path()));
 
     ScanReport scanReport = reporter.lastReport();
     assertThat(scanReport).isNotNull();
     assertThat(scanReport.tableName()).isEqualTo(tableName);
-    assertThat(scanReport.snapshotId()).isEqualTo(3L);
+    assertThat(scanReport.snapshotId()).isEqualTo(4L);
     ScanMetricsResult result = scanReport.scanMetrics();
     
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
     assertThat(result.resultDataFiles().value()).isEqualTo(1);
@@ -272,9 +282,9 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     assertThat(result.skippedDeleteFiles().value()).isEqualTo(1);
     assertThat(result.scannedDataManifests().value()).isEqualTo(1);
     assertThat(result.scannedDeleteManifests().value()).isEqualTo(1);
-    assertThat(result.skippedDataManifests().value()).isEqualTo(0);
+    assertThat(result.skippedDataManifests().value()).isEqualTo(1);
     assertThat(result.skippedDeleteManifests().value()).isEqualTo(1);
-    assertThat(result.totalDataManifests().value()).isEqualTo(1);
+    assertThat(result.totalDataManifests().value()).isEqualTo(2);
     assertThat(result.totalDeleteManifests().value()).isEqualTo(2);
     assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
     assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(10L);

Reply via email to