This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3ca14b8c1e Spark 3.4: Fix metrics reporting in distributed planning
(#8613)
3ca14b8c1e is described below
commit 3ca14b8c1e7af524793ca39e60a92fdcc52f35bb
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 21 14:29:04 2023 -0700
Spark 3.4: Fix metrics reporting in distributed planning (#8613)
---
.../apache/iceberg/SparkDistributedDataScan.java | 27 ++++++-
.../TestSparkDistributedDataScanReporting.java | 85 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
index d4c2848b45..43ce2a303e 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
private Broadcast<Table> tableBroadcast = null;
public SparkDistributedDataScan(SparkSession spark, Table table,
SparkReadConf readConf) {
- this(spark, table, readConf, table.schema(), TableScanContext.empty());
+ this(spark, table, readConf, table.schema(), newTableScanContext(table));
}
private SparkDistributedDataScan(
@@ -134,6 +135,10 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
.flatMap(new ReadDataManifest(tableBroadcast(), context(),
withColumnStats));
List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
+ int matchingFilesCount =
dataFileGroups.stream().mapToInt(List::size).sum();
+ int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
+ scanMetrics().skippedDataFiles().increment(skippedFilesCount);
+
return Iterables.transform(dataFileGroups,
CloseableIterable::withNoopClose);
}
@@ -157,6 +162,9 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
.flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
.collect();
+ int skippedFilesCount = liveFilesCount(deleteManifests) -
deleteFiles.size();
+ scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
+
return DeleteFileIndex.builderFor(deleteFiles)
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
@@ -193,6 +201,23 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
return Arrays.asList(rdd.collectPartitions(partitionIds));
}
+ private int liveFilesCount(List<ManifestFile> manifests) {
+ return manifests.stream().mapToInt(this::liveFilesCount).sum();
+ }
+
+ private int liveFilesCount(ManifestFile manifest) {
+ return manifest.existingFilesCount() + manifest.addedFilesCount();
+ }
+
+ private static TableScanContext newTableScanContext(Table table) {
+ if (table instanceof BaseTable) {
+ MetricsReporter reporter = ((BaseTable) table).reporter();
+ return
ImmutableTableScanContext.builder().metricsReporter(reporter).build();
+ } else {
+ return TableScanContext.empty();
+ }
+ }
+
private static class ReadDataManifest implements
FlatMapFunction<ManifestFileBean, DataFile> {
private final Broadcast<Table> table;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
new file mode 100644
index 0000000000..1ea4f990b2
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanReporting
+ extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
+
+ @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {LOCAL, LOCAL},
+ new Object[] {LOCAL, DISTRIBUTED},
+ new Object[] {DISTRIBUTED, LOCAL},
+ new Object[] {DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ private static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public TestSparkDistributedDataScanReporting(
+ PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSparkDistributedDataScanReporting.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
+ TestSparkDistributedDataScanReporting.spark = null;
+ currentSpark.stop();
+ }
+
+ @Override
+ protected BatchScan newScan(Table table) {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+}