This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c69a7c28e1 [VL] Add cross-config / cross-build-cycle invariant tests
for ColumnarCachedBatchSerializer (#12124)
c69a7c28e1 is described below
commit c69a7c28e10907f6c58901fb5ae18b9e6c23b5c0
Author: Kent Yao <[email protected]>
AuthorDate: Fri May 22 12:03:51 2026 +0800
[VL] Add cross-config / cross-build-cycle invariant tests for
ColumnarCachedBatchSerializer (#12124)
Extend ColumnarCachedBatchE2ESuite with three lifecycle invariant tests that
exercise the cached-batch wire format across SQLConf transitions:
1. cross-config: build with stats=true, read with stats=false
-- wire format is build-time-decided; v2-with-stats payload must
survive a reader-time downgrade and prune must still engage.
2. cross-config (reverse): build with stats=false, read with stats=true
-- legacy v1 payload (stats=null) at build time; reader must NOT
fabricate stats and must fall back to full scan.
3. cross-build-cycle: same logical query rebuilt twice with different
stats settings (round 1 stats=true, round 2 stats=false). Round 2
must re-honor stats=false; the serializer must not reuse stale gate
state from round 1.
Each test asserts (R-correct) result row count, (R-path) the cached
batches are served by ColumnarCachedBatchSerializer (not vanilla
DefaultCachedBatchSerializer), and (R-prune, when expectPrune=true)
that InMemoryTableScanExec.numOutputRows reflects partition pruning.
The shared assertion logic is factored into a private helper
`assertGlutenCachedPlanAndPrune(df, expectPrune)`. Its scaladoc
documents an intentional asymmetry: the reverse "no prune" direction
is not observable via numOutputRows on the Gluten native path (the same
baseline test "numOutputRows reflects post-filter row count" already
notes that outRows may legitimately be 0 even with full pruning, because
the surviving row is delivered through the native scan metrics path).
The expectPrune=false branch therefore intentionally performs path-only
verification.
All 15 suite tests pass locally (Spark 3.3, Velox backend).
Generated-by: Claude claude-opus-4.7
---
.../execution/ColumnarCachedBatchE2ESuite.scala | 103 +++++++++++++++++++++
1 file changed, 103 insertions(+)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchE2ESuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchE2ESuite.scala
index 34998c887d..fe261b3e3f 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchE2ESuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchE2ESuite.scala
@@ -74,6 +74,26 @@ class ColumnarCachedBatchE2ESuite
.cache()
}
+ // Caller must have triggered execution so numOutputRows is populated.
+ // expectPrune=false is path-only: numOutputRows is unreliable for "no prune"
+ // on the Gluten native path (surviving rows bypass the IMS counter, see
+ // baseline "numOutputRows reflects post-filter row count" test).
+ private def assertGlutenCachedPlanAndPrune(df: DataFrame, expectPrune:
Boolean): Unit = {
+ val plan = df.queryExecution.executedPlan
+ val ims = find(plan) {
+ case _: InMemoryTableScanExec => true
+ case _ => false
+ }
+ .get.asInstanceOf[InMemoryTableScanExec]
+ val serName = ims.relation.cacheBuilder.serializer.getClass.getSimpleName
+ assert(serName == "ColumnarCachedBatchSerializer", s"got $serName")
+ if (expectPrune) {
+ val outRows = ims.metrics("numOutputRows").value
+ val upperBound = (N / P) * 2
+ assert(outRows <= upperBound, s"numOutputRows=$outRows > $upperBound
(N=$N, P=$P)")
+ }
+ }
+
test("e2e cache + equality filter: no crash + correct result") {
val cached = cacheRange()
try {
@@ -385,4 +405,87 @@ class ColumnarCachedBatchE2ESuite
}
}
}
+
+ // Cross-config: build with stats enabled, read with stats disabled.
+ // Wire format is build-time-decided, so reader-time SQLConf must not affect
prune.
+ test("cross-config: build with stats enabled, read with stats disabled") {
+ var cached: DataFrame = null
+ var filtered: DataFrame = null
+ var result: Long = -1L
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true")
{
+ val df = cacheRange()
+ df.count()
+ cached = df
+ }
+ try {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key ->
"false") {
+ filtered = cached.filter(col("k") === pivot)
+ result = filtered.count()
+ }
+ assert(result == 1L, s"got $result")
+ assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
+ } finally {
+ cached.unpersist()
+ }
+ }
+
+ // Reverse: legacy v1 payload at build (stats=null), reader cannot fabricate
+ // stats. Distinct from the same-config legacy test: this forces
cross-config.
+ test("cross-config: build with stats disabled, read with stats enabled") {
+ var cached: DataFrame = null
+ var filtered: DataFrame = null
+ var result: Long = -1L
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key ->
"false") {
+ val df = cacheRange()
+ df.count()
+ cached = df
+ }
+ try {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key ->
"true") {
+ filtered = cached.filter(col("k") === pivot)
+ result = filtered.count()
+ }
+ assert(result == 1L, s"got $result")
+ assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
+ } finally {
+ cached.unpersist()
+ }
+ }
+
+ // Round 2 must re-honor the new SQLConf, not reuse stale gate decision /
+ // payload from round 1.
+ test("cross-build-cycle: unpersist + toggle stats config + rebuild same
query") {
+ var resultA: Long = -1L
+ var resultB: Long = -1L
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true")
{
+ val df = cacheRange()
+ try {
+ df.count()
+ val filtered = df.filter(col("k") === pivot)
+ resultA = filtered.count()
+ assert(resultA == 1L, s"round 1: got $resultA")
+ assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
+ } finally {
+ df.unpersist(blocking = true)
+ }
+ }
+ withSQLConf(
+ GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key ->
"false") {
+ val df = cacheRange()
+ try {
+ df.count()
+ val filtered = df.filter(col("k") === pivot)
+ resultB = filtered.count()
+ assert(resultB == 1L, s"round 2: got $resultB")
+ assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
+ } finally {
+ df.unpersist(blocking = true)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]