This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ecda35bf53 [GLUTEN-8497][VL] A bad test case that fails columnar table 
cache query (#8498)
ecda35bf53 is described below

commit ecda35bf5314c9d15515133309747ebe69f000e5
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Jan 13 10:02:44 2025 +0800

    [GLUTEN-8497][VL] A bad test case that fails columnar table cache query 
(#8498)
---
 .../gluten/execution/VeloxColumnarCacheSuite.scala | 43 ++++++++++++++++++++--
 .../columnar/transition/TransitionGraph.scala      |  1 +
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
index e9151ad84a..8c7be883bb 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
@@ -24,8 +24,11 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.storage.StorageLevel
 
+import scala.collection.JavaConverters._
+
 class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
   override protected val resourcePath: String = "/tpch-data-parquet"
   override protected val fileFormat: String = "parquet"
@@ -55,7 +58,7 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
     )
   }
 
-  test("input columnar batch") {
+  test("Input columnar batch") {
     TPCHTables.map(_.name).foreach {
       table =>
         runQueryAndCompare(s"SELECT * FROM $table", cache = true) {
@@ -64,7 +67,7 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
     }
   }
 
-  test("input columnar batch and column pruning") {
+  test("Input columnar batch and column pruning") {
     val expected = sql("SELECT l_partkey FROM lineitem").collect()
     val cached = sql("SELECT * FROM lineitem").cache()
     try {
@@ -85,7 +88,7 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
     }
   }
 
-  test("input vanilla Spark columnar batch") {
+  test("Input vanilla Spark columnar batch") {
     withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
       val df = spark.table("lineitem")
       val expected = df.collect()
@@ -98,6 +101,40 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
     }
   }
 
+  // TODO: Fix this case. See 
https://github.com/apache/incubator-gluten/issues/8497.
+  testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar 
scan", Some("3.3")) {
+    def withId(id: Int): Metadata =
+      new MetadataBuilder().putLong("parquet.field.id", id).build()
+
+    withTempDir {
+      dir =>
+        val readSchema =
+          new StructType()
+            .add("l_orderkey_read", LongType, true, withId(1))
+        val writeSchema =
+          new StructType()
+            .add("l_orderkey_write", LongType, true, withId(1))
+        withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") {
+          // Write a table with metadata information that Gluten Velox backend 
doesn't support,
+          // to emulate the scenario that a Spark columnar scan is not 
offload-able so fallen back,
+          // then user tries to cache it.
+          spark
+            .createDataFrame(
+              spark.sql("select l_orderkey from 
lineitem").collect().toList.asJava,
+              writeSchema)
+            .write
+            .mode("overwrite")
+            .parquet(dir.getCanonicalPath)
+          val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          df.cache()
+          // FIXME: The following call will throw since 
ColumnarCachedBatchSerializer will be
+          //  confused by the input vanilla Parquet scan when its 
#convertColumnarBatchToCachedBatch
+          //  method is called.
+          assertThrows[Exception](df.collect())
+        }
+    }
+  }
+
   test("CachedColumnarBatch serialize and deserialize") {
     val df = spark.table("lineitem")
     val expected = df.collect()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
index 2733ed9f4f..ef08a34d56 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
@@ -77,6 +77,7 @@ object TransitionGraph {
     }
   }
 
+  // TODO: Consolidate transition graph's cost model with RAS cost model.
   private object TransitionCostModel extends 
FloydWarshallGraph.CostModel[Transition] {
     override def zero(): TransitionCost = TransitionCost(0, Nil)
     override def costOf(transition: Transition): TransitionCost = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to