This is an automated email from the ASF dual-hosted git repository.
jinchengchenghh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 09a629ace7 [GLUTEN-11980][CORE][TESTS] Add test cases for decimal-key
joins if either side of join exists native scan fallback to vanilla. (#12061)
09a629ace7 is described below
commit 09a629ace7f4212d3fe5494972f10fec210a314e
Author: Jiaan Geng <[email protected]>
AuthorDate: Fri May 29 12:22:51 2026 +0800
[GLUTEN-11980][CORE][TESTS] Add test cases for decimal-key joins if either
side of join exists native scan fallback to vanilla. (#12061)
What changes are proposed in this pull request?
This PR proposes to add test cases for decimal-key joins if either side of
join exists native scan fallback to vanilla.
Before facebookincubator/velox@472d319, there is a bug that join key
contains decimal type if either side of join exists native scan fallback to
vanilla. The bug is caused by decimal precision/scale is different from read
side and write side.
After facebookincubator/velox@472d319, the bug is fixed.
How was this patch tested?
Manual tests in our production environment.
UT.
Was this patch authored or co-authored using generative AI tooling?
'No'.
I just use AI to generate the comments.
Related issue: #11980
---
.../apache/gluten/execution/FallbackSuite.scala | 165 +++++++++++++++++++++
1 file changed, 165 insertions(+)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index ddc9cc923e..53af1664c8 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -23,6 +23,7 @@ import
org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarSh
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BroadcastNestedLoopJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.utils.GlutenSuiteUtils
class FallbackSuite extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
@@ -62,12 +63,53 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
.write
.format("parquet")
.saveAsTable("tmp3")
+ // ORC files are written with DECIMAL(38, 18) (Hive's native storage
precision).
+ // tmp4/tmp5 declare DECIMAL(20, 0) pointing to the same ORC files,
+ // so the reader must handle a precision/scale mismatch.
+ spark
+ .range(100)
+ .selectExpr(
+ "cast(id as decimal(38, 18)) as c1",
+ "cast(id % 3 as int) as c2",
+ "cast(id % 9 as timestamp) as c3")
+ .write
+ .format("orc")
+ .saveAsTable("tmp4_wide")
+ spark
+ .range(100)
+ .selectExpr(
+ "cast(id as decimal(38, 18)) as c1",
+ "cast(id % 3 as int) as c2",
+ "cast(id % 5 as timestamp) as c3")
+ .write
+ .format("orc")
+ .saveAsTable("tmp5_wide")
+ val loc4 = spark
+ .sql("DESCRIBE FORMATTED tmp4_wide")
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .collect()(0)
+ .getString(0)
+ val loc5 = spark
+ .sql("DESCRIBE FORMATTED tmp5_wide")
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .collect()(0)
+ .getString(0)
+ spark.sql(
+ s"CREATE TABLE tmp4 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) USING ORC
LOCATION '$loc4'")
+ spark.sql(
+ s"CREATE TABLE tmp5 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) USING ORC
LOCATION '$loc5'")
}
override protected def afterAll(): Unit = {
spark.sql("drop table tmp1")
spark.sql("drop table tmp2")
spark.sql("drop table tmp3")
+ spark.sql("drop table tmp4_wide")
+ spark.sql("drop table tmp5_wide")
+ spark.sql("drop table tmp4")
+ spark.sql("drop table tmp5")
super.afterAll()
}
@@ -382,4 +424,127 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}
+
+ test("For decimal-key joins, if one side falls back to Spark, force fallback
the other side") {
+ // ORC files are written with DECIMAL(38, 18) (Hive's native storage
precision).
+ // The metastore tables tmp4/tmp5 declare DECIMAL(20, 0) and point to the
+ // same ORC files, so the reader must handle a precision/scale mismatch.
+ // Selecting only c2 (INT) -> native FileSourceScanExecTransformer.
+ // Selecting c3 (TIMESTAMP) in addition -> native validation fails ->
+ // vanilla FileSourceScanExec.
+
+ // -- SortMergeJoin
------------------------------------------------------------------
+
+ val sql1 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
+ GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") {
+ checkAnswer(
+ spark.sql(sql1),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ val sql2 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, " +
+ "tmp5.c2 AS 5c2 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
+ GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") {
+ checkAnswer(
+ spark.sql(sql2),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ val sql3 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
+ GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") {
+ checkAnswer(
+ spark.sql(sql3),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ // -- ShuffledHashJoin
---------------------------------------------------------------
+
+ val sql4 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS
4c3, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ checkAnswer(
+ spark.sql(sql4),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ val sql5 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS
4c3, " +
+ "tmp5.c2 AS 5c2 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ checkAnswer(
+ spark.sql(sql5),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ val sql6 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ checkAnswer(
+ spark.sql(sql6),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
+
+ // -- BroadcastHashJoin
--------------------------------------------------------------
+
+ val sql7 = "SELECT tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ checkAnswer(
+ spark.sql(sql7),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+
+ val sql8 = "SELECT tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, " +
+ "tmp5.c2 AS 5c2 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ checkAnswer(
+ spark.sql(sql8),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, tmp4_wide.c3 AS 4c3, " +
+ "tmp5_wide.c2 AS 5c2 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+
+ val sql9 = "SELECT tmp4.c2 AS 4c2, " +
+ "tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 FROM tmp4 JOIN tmp5 ON tmp4.c1 = tmp5.c1"
+ checkAnswer(
+ spark.sql(sql9),
+ spark.sql(
+ "SELECT tmp4_wide.c2 AS 4c2, " +
+ "tmp5_wide.c2 AS 5c2, tmp5_wide.c3 AS 5c3 " +
+ "FROM tmp4_wide JOIN tmp5_wide ON tmp4_wide.c1 = tmp5_wide.c1")
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]