This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c5720b5df [core] Fix skippedTableFiles in scan metrics (#3657)
c5720b5df is described below
commit c5720b5dfd97835d66793ab459fafc6690275032
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jul 3 19:52:03 2024 +0800
[core] Fix skippedTableFiles in scan metrics (#3657)
---
.../paimon/operation/AbstractFileStoreScan.java | 30 ++++------
.../apache/paimon/spark/PaimonSparkTestBase.scala | 12 +++-
.../apache/paimon/spark/sql/PaimonMetricTest.scala | 66 ++++++++++++++++++++++
.../spark/sql/PaimonOptimizationTestBase.scala | 10 ----
4 files changed, 88 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 32b87e406..5e6f914fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -254,8 +253,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
public List<SimpleFileEntry> readSimpleEntries() {
List<ManifestFileMeta> manifests = readManifests().getRight();
Collection<SimpleFileEntry> mergedEntries =
- readAndMergeFileEntries(
- manifests, this::readSimpleEntries,
Filter.alwaysTrue(), new AtomicLong());
+ readAndMergeFileEntries(manifests, this::readSimpleEntries,
Filter.alwaysTrue());
return new ArrayList<>(mergedEntries);
}
@@ -291,19 +289,14 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
List<ManifestFileMeta> manifests = snapshotListPair.getRight();
long startDataFiles =
- manifests.stream().mapToLong(f -> f.numAddedFiles() +
f.numDeletedFiles()).sum();
-
- AtomicLong cntEntries = new AtomicLong(0);
+ manifests.stream().mapToLong(f -> f.numAddedFiles() -
f.numDeletedFiles()).sum();
Collection<ManifestEntry> mergedEntries =
readAndMergeFileEntries(
- manifests,
- this::readManifestFileMeta,
- this::filterUnmergedManifestEntry,
- cntEntries);
+ manifests, this::readManifestFileMeta,
this::filterUnmergedManifestEntry);
List<ManifestEntry> files = new ArrayList<>();
- long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
+ long skippedByPartitionAndStats = startDataFiles -
mergedEntries.size();
for (ManifestEntry file : mergedEntries) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
@@ -355,6 +348,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
long skippedByWholeBucketFiles = afterBucketFilter - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
+ checkState(
+ startDataFiles
+ - skippedByPartitionAndStats
+ - skippedByBucketAndLevelFilter
+ - skippedByWholeBucketFiles
+ == files.size());
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(
@@ -371,8 +370,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
List<ManifestFileMeta> manifests,
Function<ManifestFileMeta, List<T>> manifestReader,
- @Nullable Filter<T> filterUnmergedEntry,
- @Nullable AtomicLong readEntries) {
+ @Nullable Filter<T> filterUnmergedEntry) {
Iterable<T> entries =
ScanParallelExecutor.parallelismBatchIterable(
files -> {
@@ -383,11 +381,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
if (filterUnmergedEntry != null) {
stream =
stream.filter(filterUnmergedEntry::test);
}
- List<T> entryList =
stream.collect(Collectors.toList());
- if (readEntries != null) {
- readEntries.getAndAdd(entryList.size());
- }
- return entryList;
+ return stream.collect(Collectors.toList());
},
manifests,
scanManifestParallelism);
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 814001bc9..d431a754d 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -30,7 +30,7 @@ import org.apache.spark.paimon.Utils
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
import org.apache.spark.sql.test.SharedSparkSession
import org.scalactic.source.Position
import org.scalatest.Tag
@@ -120,11 +120,19 @@ class PaimonSparkTestBase
}
protected def createRelationV2(tableName: String): LogicalPlan = {
- val sparkTable = new SparkTable(loadTable(tableName))
+ val sparkTable = SparkTable(loadTable(tableName))
DataSourceV2Relation.create(
sparkTable,
Some(spark.sessionState.catalogManager.currentCatalog),
Some(SparkIdentifier.of(Array(this.dbName0), tableName))
)
}
+
+ protected def getPaimonScan(sqlText: String): PaimonScan = {
+ sql(sqlText).queryExecution.optimizedPlan
+ .collectFirst { case relation: DataSourceV2ScanRelation => relation }
+ .get
+ .scan
+ .asInstanceOf[PaimonScan]
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
new file mode 100644
index 000000000..99ba335a7
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES,
SKIPPED_TABLE_FILES}
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.junit.jupiter.api.Assertions
+
+class PaimonMetricTest extends PaimonSparkTestBase {
+
+ test(s"Paimon Metric: scan driver metric") {
+ // Spark support reportDriverMetrics since Spark 3.4
+ if (gteqSpark3_4) {
+ sql(s"""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES ('bucket'='1', 'bucket-key'='id',
'write-only'='true')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+ sql(s"INSERT INTO T VALUES (3, 'c', 'p2'), (4, 'c', 'p3')")
+ sql(s"INSERT INTO T VALUES (5, 'd', 'p2')")
+
+ def checkMetrics(s: String, skippedTableFiles: Long, resultedTableFiles:
Long): Unit = {
+ val scan = getPaimonScan(s)
+ // call getInputPartitions to trigger scan
+ scan.getInputPartitions
+ val metrics = scan.reportDriverMetrics()
+ Assertions.assertEquals(skippedTableFiles, metric(metrics,
SKIPPED_TABLE_FILES))
+ Assertions.assertEquals(resultedTableFiles, metric(metrics,
RESULTED_TABLE_FILES))
+ }
+
+ checkMetrics(s"SELECT * FROM T", 0, 5)
+ checkMetrics(s"SELECT * FROM T WHERE pt = 'p2'", 2, 3)
+
+ sql(s"DELETE FROM T WHERE pt = 'p1'")
+ checkMetrics(s"SELECT * FROM T", 0, 4)
+
+ sql("CALL sys.compact(table => 'T', partitions => 'pt=\"p2\"')")
+ checkMetrics(s"SELECT * FROM T", 0, 2)
+ checkMetrics(s"SELECT * FROM T WHERE pt = 'p2'", 1, 1)
+ }
+ }
+
+ def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
+ metrics.find(_.name() == name).get.value()
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 9ab5551dd..70cb4b0c4 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions
@@ -101,15 +100,6 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase {
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3,
'c', 'p2')")
val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
- def getPaimonScan(sqlText: String) = {
- spark
- .sql(sqlText)
- .queryExecution
- .optimizedPlan
- .collectFirst { case relation: DataSourceV2ScanRelation => relation }
- .get
- .scan
- }
Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
}
}