This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca14b8c1e Spark 3.4: Fix metrics reporting in distributed planning 
(#8613)
3ca14b8c1e is described below

commit 3ca14b8c1e7af524793ca39e60a92fdcc52f35bb
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 21 14:29:04 2023 -0700

    Spark 3.4: Fix metrics reporting in distributed planning (#8613)
---
 .../apache/iceberg/SparkDistributedDataScan.java   | 27 ++++++-
 .../TestSparkDistributedDataScanReporting.java     | 85 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
index d4c2848b45..43ce2a303e 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
   private Broadcast<Table> tableBroadcast = null;
 
   public SparkDistributedDataScan(SparkSession spark, Table table, 
SparkReadConf readConf) {
-    this(spark, table, readConf, table.schema(), TableScanContext.empty());
+    this(spark, table, readConf, table.schema(), newTableScanContext(table));
   }
 
   private SparkDistributedDataScan(
@@ -134,6 +135,10 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
             .flatMap(new ReadDataManifest(tableBroadcast(), context(), 
withColumnStats));
     List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
 
+    int matchingFilesCount = 
dataFileGroups.stream().mapToInt(List::size).sum();
+    int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
+    scanMetrics().skippedDataFiles().increment(skippedFilesCount);
+
     return Iterables.transform(dataFileGroups, 
CloseableIterable::withNoopClose);
   }
 
@@ -157,6 +162,9 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
             .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
             .collect();
 
+    int skippedFilesCount = liveFilesCount(deleteManifests) - 
deleteFiles.size();
+    scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
+
     return DeleteFileIndex.builderFor(deleteFiles)
         .specsById(table().specs())
         .caseSensitive(isCaseSensitive())
@@ -193,6 +201,23 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
     return Arrays.asList(rdd.collectPartitions(partitionIds));
   }
 
+  private int liveFilesCount(List<ManifestFile> manifests) {
+    return manifests.stream().mapToInt(this::liveFilesCount).sum();
+  }
+
+  private int liveFilesCount(ManifestFile manifest) {
+    return manifest.existingFilesCount() + manifest.addedFilesCount();
+  }
+
+  private static TableScanContext newTableScanContext(Table table) {
+    if (table instanceof BaseTable) {
+      MetricsReporter reporter = ((BaseTable) table).reporter();
+      return 
ImmutableTableScanContext.builder().metricsReporter(reporter).build();
+    } else {
+      return TableScanContext.empty();
+    }
+  }
+
   private static class ReadDataManifest implements 
FlatMapFunction<ManifestFileBean, DataFile> {
 
     private final Broadcast<Table> table;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
new file mode 100644
index 0000000000..1ea4f990b2
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanReporting
+    extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
+
+  @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      new Object[] {LOCAL, LOCAL},
+      new Object[] {LOCAL, DISTRIBUTED},
+      new Object[] {DISTRIBUTED, LOCAL},
+      new Object[] {DISTRIBUTED, DISTRIBUTED}
+    };
+  }
+
+  private static SparkSession spark = null;
+
+  private final PlanningMode dataMode;
+  private final PlanningMode deleteMode;
+
+  public TestSparkDistributedDataScanReporting(
+      PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+    this.dataMode = dataPlanningMode;
+    this.deleteMode = deletePlanningMode;
+  }
+
+  @BeforeClass
+  public static void startSpark() {
+    TestSparkDistributedDataScanReporting.spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
+    TestSparkDistributedDataScanReporting.spark = null;
+    currentSpark.stop();
+  }
+
+  @Override
+  protected BatchScan newScan(Table table) {
+    table
+        .updateProperties()
+        .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+        .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+        .commit();
+    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    return new SparkDistributedDataScan(spark, table, readConf);
+  }
+}

Reply via email to