This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c5720b5df [core] Fix skippedTableFiles in scan metrics (#3657)
c5720b5df is described below

commit c5720b5dfd97835d66793ab459fafc6690275032
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jul 3 19:52:03 2024 +0800

    [core] Fix skippedTableFiles in scan metrics (#3657)
---
 .../paimon/operation/AbstractFileStoreScan.java    | 30 ++++------
 .../apache/paimon/spark/PaimonSparkTestBase.scala  | 12 +++-
 .../apache/paimon/spark/sql/PaimonMetricTest.scala | 66 ++++++++++++++++++++++
 .../spark/sql/PaimonOptimizationTestBase.scala     | 10 ----
 4 files changed, 88 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 32b87e406..5e6f914fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -254,8 +253,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     public List<SimpleFileEntry> readSimpleEntries() {
         List<ManifestFileMeta> manifests = readManifests().getRight();
         Collection<SimpleFileEntry> mergedEntries =
-                readAndMergeFileEntries(
-                        manifests, this::readSimpleEntries, 
Filter.alwaysTrue(), new AtomicLong());
+                readAndMergeFileEntries(manifests, this::readSimpleEntries, 
Filter.alwaysTrue());
         return new ArrayList<>(mergedEntries);
     }
 
@@ -291,19 +289,14 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         List<ManifestFileMeta> manifests = snapshotListPair.getRight();
 
         long startDataFiles =
-                manifests.stream().mapToLong(f -> f.numAddedFiles() + 
f.numDeletedFiles()).sum();
-
-        AtomicLong cntEntries = new AtomicLong(0);
+                manifests.stream().mapToLong(f -> f.numAddedFiles() - 
f.numDeletedFiles()).sum();
 
         Collection<ManifestEntry> mergedEntries =
                 readAndMergeFileEntries(
-                        manifests,
-                        this::readManifestFileMeta,
-                        this::filterUnmergedManifestEntry,
-                        cntEntries);
+                        manifests, this::readManifestFileMeta, 
this::filterUnmergedManifestEntry);
 
         List<ManifestEntry> files = new ArrayList<>();
-        long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
+        long skippedByPartitionAndStats = startDataFiles - 
mergedEntries.size();
         for (ManifestEntry file : mergedEntries) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
                 String partInfo =
@@ -355,6 +348,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
         long skippedByWholeBucketFiles = afterBucketFilter - files.size();
         long scanDuration = (System.nanoTime() - started) / 1_000_000;
+        checkState(
+                startDataFiles
+                                - skippedByPartitionAndStats
+                                - skippedByBucketAndLevelFilter
+                                - skippedByWholeBucketFiles
+                        == files.size());
         if (scanMetrics != null) {
             scanMetrics.reportScan(
                     new ScanStats(
@@ -371,8 +370,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
             List<ManifestFileMeta> manifests,
             Function<ManifestFileMeta, List<T>> manifestReader,
-            @Nullable Filter<T> filterUnmergedEntry,
-            @Nullable AtomicLong readEntries) {
+            @Nullable Filter<T> filterUnmergedEntry) {
         Iterable<T> entries =
                 ScanParallelExecutor.parallelismBatchIterable(
                         files -> {
@@ -383,11 +381,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                             if (filterUnmergedEntry != null) {
                                 stream = 
stream.filter(filterUnmergedEntry::test);
                             }
-                            List<T> entryList = 
stream.collect(Collectors.toList());
-                            if (readEntries != null) {
-                                readEntries.getAndAdd(entryList.size());
-                            }
-                            return entryList;
+                            return stream.collect(Collectors.toList());
                         },
                         manifests,
                         scanManifestParallelism);
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 814001bc9..d431a754d 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -30,7 +30,7 @@ import org.apache.spark.paimon.Utils
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.scalactic.source.Position
 import org.scalatest.Tag
@@ -120,11 +120,19 @@ class PaimonSparkTestBase
   }
 
   protected def createRelationV2(tableName: String): LogicalPlan = {
-    val sparkTable = new SparkTable(loadTable(tableName))
+    val sparkTable = SparkTable(loadTable(tableName))
     DataSourceV2Relation.create(
       sparkTable,
       Some(spark.sessionState.catalogManager.currentCatalog),
       Some(SparkIdentifier.of(Array(this.dbName0), tableName))
     )
   }
+
+  protected def getPaimonScan(sqlText: String): PaimonScan = {
+    sql(sqlText).queryExecution.optimizedPlan
+      .collectFirst { case relation: DataSourceV2ScanRelation => relation }
+      .get
+      .scan
+      .asInstanceOf[PaimonScan]
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
new file mode 100644
index 000000000..99ba335a7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES, 
SKIPPED_TABLE_FILES}
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.junit.jupiter.api.Assertions
+
+class PaimonMetricTest extends PaimonSparkTestBase {
+
+  test(s"Paimon Metric: scan driver metric") {
+    // Spark support reportDriverMetrics since Spark 3.4
+    if (gteqSpark3_4) {
+      sql(s"""
+             |CREATE TABLE T (id INT, name STRING, pt STRING)
+             |TBLPROPERTIES ('bucket'='1', 'bucket-key'='id', 
'write-only'='true')
+             |PARTITIONED BY (pt)
+             |""".stripMargin)
+
+      sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      sql(s"INSERT INTO T VALUES (3, 'c', 'p2'), (4, 'c', 'p3')")
+      sql(s"INSERT INTO T VALUES (5, 'd', 'p2')")
+
+      def checkMetrics(s: String, skippedTableFiles: Long, resultedTableFiles: 
Long): Unit = {
+        val scan = getPaimonScan(s)
+        // call getInputPartitions to trigger scan
+        scan.getInputPartitions
+        val metrics = scan.reportDriverMetrics()
+        Assertions.assertEquals(skippedTableFiles, metric(metrics, 
SKIPPED_TABLE_FILES))
+        Assertions.assertEquals(resultedTableFiles, metric(metrics, 
RESULTED_TABLE_FILES))
+      }
+
+      checkMetrics(s"SELECT * FROM T", 0, 5)
+      checkMetrics(s"SELECT * FROM T WHERE pt = 'p2'", 2, 3)
+
+      sql(s"DELETE FROM T WHERE pt = 'p1'")
+      checkMetrics(s"SELECT * FROM T", 0, 4)
+
+      sql("CALL sys.compact(table => 'T', partitions => 'pt=\"p2\"')")
+      checkMetrics(s"SELECT * FROM T", 0, 2)
+      checkMetrics(s"SELECT * FROM T WHERE pt = 'p2'", 1, 1)
+    }
+  }
+
+  def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
+    metrics.find(_.name() == name).get.value()
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 9ab5551dd..70cb4b0c4 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
LogicalPlan, OneRowRelation, WithCTE}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.functions._
 import org.junit.jupiter.api.Assertions
 
@@ -101,15 +100,6 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase {
       spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 
'c', 'p2')")
 
       val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
-      def getPaimonScan(sqlText: String) = {
-        spark
-          .sql(sqlText)
-          .queryExecution
-          .optimizedPlan
-          .collectFirst { case relation: DataSourceV2ScanRelation => relation }
-          .get
-          .scan
-      }
       Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
     }
   }

Reply via email to