This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2226cfaba fix: Refactor CometScanRule and fix bugs (#1483)
2226cfaba is described below
commit 2226cfaba480333c5857dc70bd792be8dca9e47c
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 19 14:18:13 2025 -0600
fix: Refactor CometScanRule and fix bugs (#1483)
---
.../apache/comet/CometSparkSessionExtensions.scala | 126 ++++++++++-----------
.../comet/exec/CometColumnarShuffleSuite.scala | 25 ++++
.../comet/exec/CometNativeShuffleSuite.scala | 2 +
3 files changed, 85 insertions(+), 68 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 800cab832..105144e55 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -21,6 +21,8 @@ package org.apache.comet
import java.nio.ByteOrder
+import scala.collection.mutable.ListBuffer
+
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
@@ -100,9 +102,6 @@ class CometSparkSessionExtensions
plan
} else {
- def isDynamicPruningFilter(e: Expression): Boolean =
- e.exists(_.isInstanceOf[PlanExpression[_]])
-
def hasMetadataCol(plan: SparkPlan): Boolean = {
plan.expressions.exists(_.exists {
case a: Attribute =>
@@ -116,11 +115,9 @@ class CometSparkSessionExtensions
withInfo(scan, "Metadata column is not supported")
scan
- case scanExec: FileSourceScanExec
- if COMET_DPP_FALLBACK_ENABLED.get() &&
- scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
- withInfo(scanExec, "DPP not supported")
- scanExec
+ // data source V1
+ case scanExec: FileSourceScanExec =>
+ transformV1Scan(scanExec)
// data source V2
case scanExec: BatchScanExec
@@ -188,69 +185,62 @@ class CometSparkSessionExtensions
scanExec
}
- // data source V1
- case scanExec @ FileSourceScanExec(
- HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
- _: Seq[_],
- requiredSchema,
- _,
- _,
- _,
- _,
- _,
- _)
- if CometScanExec.isFileFormatSupported(fileFormat)
- && CometNativeScanExec.isSchemaSupported(requiredSchema)
- && CometNativeScanExec.isSchemaSupported(partitionSchema)
- // TODO we only enable full native scan if COMET_EXEC_ENABLED
is enabled
- // but this is not really what we want .. we currently insert
`CometScanExec`
- // here and then it gets replaced with `CometNativeScanExec`
in `CometExecRule`
- // but that only happens if `COMET_EXEC_ENABLED` is enabled
- && COMET_EXEC_ENABLED.get()
- && COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_DATAFUSION =>
- logInfo("Comet extension enabled for v1 full native Scan")
- CometScanExec(scanExec, session)
+ }
+ }
+ }
- // data source V1
- case scanExec @ FileSourceScanExec(
- HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
- _: Seq[_],
- requiredSchema,
- _,
- _,
- _,
- _,
- _,
- _)
- if CometScanExec.isFileFormatSupported(fileFormat)
- && CometScanExec.isSchemaSupported(requiredSchema)
- && CometScanExec.isSchemaSupported(partitionSchema) =>
- logInfo("Comet extension enabled for v1 Scan")
- CometScanExec(scanExec, session)
+ private def isDynamicPruningFilter(e: Expression): Boolean =
+ e.exists(_.isInstanceOf[PlanExpression[_]])
- // data source v1 not supported case
- case scanExec @ FileSourceScanExec(
- HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
- _: Seq[_],
- requiredSchema,
- _,
- _,
- _,
- _,
- _,
- _) =>
- val info1 = createMessage(
- !CometScanExec.isFileFormatSupported(fileFormat),
- s"File format $fileFormat is not supported")
- val info2 = createMessage(
- !CometScanExec.isSchemaSupported(requiredSchema),
- s"Schema $requiredSchema is not supported")
- val info3 = createMessage(
- !CometScanExec.isSchemaSupported(partitionSchema),
- s"Partition schema $partitionSchema is not supported")
- withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
+ private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
+
+ if (COMET_DPP_FALLBACK_ENABLED.get() &&
+ scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
+ withInfo(scanExec, "DPP not supported")
+ return scanExec
+ }
+
+ scanExec.relation match {
+ case r: HadoopFsRelation =>
+ val fallbackReasons = new ListBuffer[String]()
+ if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
+ fallbackReasons += s"Unsupported file format ${r.fileFormat}"
+ }
+
+ val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
+ if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION &&
!COMET_EXEC_ENABLED.get()) {
+ fallbackReasons +=
+ s"Full native scan disabled because ${COMET_EXEC_ENABLED.key}
disabled"
+ }
+
+ val (schemaSupported, partitionSchemaSupported) = scanImpl match {
+ case CometConf.SCAN_NATIVE_DATAFUSION =>
+ (
+ CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema),
+ CometNativeScanExec.isSchemaSupported(r.partitionSchema))
+ case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
+ (
+ CometScanExec.isSchemaSupported(scanExec.requiredSchema),
+ CometScanExec.isSchemaSupported(r.partitionSchema))
+ }
+
+ if (!schemaSupported) {
+ fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema}
for $scanImpl"
+ }
+ if (!partitionSchemaSupported) {
+ fallbackReasons += s"Unsupported partitioning schema
${r.partitionSchema} for $scanImpl"
+ }
+
+ if (fallbackReasons.isEmpty) {
+ CometScanExec(scanExec, session)
+ } else {
+ withInfo(scanExec, fallbackReasons.mkString(", "))
scanExec
- }
+ }
+
+ case _ =>
+ withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
+ scanExec
}
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 1df865290..45134b8b4 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -110,6 +110,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("columnar shuffle on nested struct including nulls") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 201).foreach { numPartitions =>
Seq("1.0", "10.0").foreach { ratio =>
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
ratio) {
@@ -247,6 +249,9 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("columnar shuffle on map") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
+
def genTuples[K](num: Int, keys: Seq[K]): Seq[(
Int,
Map[K, Boolean],
@@ -580,6 +585,9 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("columnar shuffle on array") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
+
Seq(10, 201).foreach { numPartitions =>
Seq("1.0", "10.0").foreach { ratio =>
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
ratio) {
@@ -678,6 +686,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("fix: Dictionary field should have distinct dict_id") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 201).foreach { numPartitions =>
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
"2.0") {
withParquetTable(
@@ -695,6 +705,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("dictionary shuffle") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 201).foreach { numPartitions =>
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
"2.0") {
withParquetTable((0 until 10000).map(i => (1.toString, (i +
1).toLong)), "tbl") {
@@ -710,6 +722,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("dictionary shuffle: fallback to string") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 201).foreach { numPartitions =>
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
"1000000000.0") {
withParquetTable((0 until 10000).map(i => (1.toString, (i +
1).toLong)), "tbl") {
@@ -725,6 +739,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("fix: inMemSorter should be reset after spilling") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") {
assert(
sql("SELECT * FROM tbl").repartition(201, $"_1").count() ==
sql("SELECT * FROM tbl")
@@ -733,6 +749,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("fix: native Unsafe row accessors return incorrect results") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 201).foreach { numPartitions =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -854,6 +872,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("Columnar shuffle for large shuffle partition number") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 200, 201).foreach { numPartitions =>
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")
@@ -872,6 +892,8 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("hash-based columnar shuffle") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(10, 200, 201).foreach { numPartitions =>
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")
@@ -900,6 +922,9 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
test("columnar shuffle: different data type") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
+
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index 5b58befcd..9d40a0cdf 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -57,6 +57,8 @@ class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper
}
test("native shuffle: different data type") {
+ // https://github.com/apache/datafusion-comet/issues/1538
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
Seq(true, false).foreach { execEnabled =>
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]