This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new fea47053c [GLUTEN-5618][CH] Fix 'Position x is out of bound in Block' 
error when executing count distinct (#5619)
fea47053c is described below

commit fea47053c9716ad1cc2433403312b348889ca959
Author: Zhichao Zhang <zhan...@apache.org>
AuthorDate: Tue May 7 09:43:12 2024 +0800

    [GLUTEN-5618][CH] Fix 'Position x is out of bound in Block' error when 
executing count distinct (#5619)
    
    When excuting count distinct, the group by keys are also in the count 
distinct expression, it will throw 'Position x is out of bound in Block' error 
or core dump.
    
    RC:
    CH backend will remove the duplicated column when executing pipeline.
    
    Close #5618.
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  7 +-
 .../GlutenClickhouseCountDistinctSuite.scala       | 98 ++++++++++++++++++++++
 .../extension/CountDistinctWithoutExpand.scala     |  6 +-
 3 files changed, 107 insertions(+), 4 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index ee8b7dd45..64090af28 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -194,12 +194,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       child: SparkPlan): HashAggregateExecBaseTransformer =
     CHHashAggregateExecTransformer(
       requiredChildDistributionExpressions,
-      groupingExpressions,
+      groupingExpressions.distinct,
       aggregateExpressions,
       aggregateAttributes,
       initialInputBufferOffset,
-      resultExpressions,
-      child)
+      resultExpressions.distinct,
+      child
+    )
 
   /** Generate HashAggregateExecPullOutHelper */
   override def genHashAggregateExecPullOutHelper(
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
index b12f886e5..1b954df22 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
@@ -115,4 +115,102 @@ class GlutenClickhouseCountDistinctSuite extends 
GlutenClickHouseWholeStageTrans
       "values (0, null,1), (0,null,1), (1, 1,1), (2, 2, 1) ,(2,2,2),(3,3,3) as 
data(a,b,c)"
     compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
+
+  test(
+    "Gluten-5618: [CH] Fix 'Position x is out of bound in Block' error " +
+      "when executing count distinct") {
+
+    withSQLConf(("spark.gluten.sql.countDistinctWithoutExpand", "false")) {
+      val sql =
+        """
+          |select count(distinct a, b, c)  from
+          |values (0, null, 1), (1, 1, 1), (2, 2, 1), (1, 2, 1) ,(2, 2, 2) as 
data(a,b,c) group by c
+          |""".stripMargin
+
+      compareResultsAgainstVanillaSpark(
+        sql,
+        true,
+        {
+          df =>
+            {
+
+              val planExecs = df.queryExecution.executedPlan.collect {
+                case aggTransformer: HashAggregateExecBaseTransformer => 
aggTransformer
+              }
+
+              planExecs.head.aggregateExpressions.foreach {
+                expr => assert(expr.toString().startsWith("count("))
+              }
+              planExecs(1).aggregateExpressions.foreach {
+                expr => assert(expr.toString().startsWith("partial_count("))
+              }
+            }
+        }
+      )
+    }
+
+    val sql =
+      """
+        |select count(distinct a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
+        |from values
+        |(0, null, 1, 0, null, 1, 0, 5, 1, 0),
+        |(null, 1, 1, null, 1, 1, null, 1, 1, 3),
+        |(2, 2, 1, 2, 2, 1, 2, 2, 1, 2),
+        |(1, 2, null, 1, 2, null, 1, 2, 3, 1),
+        |(2, 2, 2, 2, 2, 2, 2, 2, 2, 2)
+        |as data(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
+        |group by a10
+        |""".stripMargin
+
+    compareResultsAgainstVanillaSpark(
+      sql,
+      true,
+      {
+        df =>
+          {
+
+            val planExecs = df.queryExecution.executedPlan.collect {
+              case aggTransformer: HashAggregateExecBaseTransformer => 
aggTransformer
+            }
+
+            planExecs.head.aggregateExpressions.foreach {
+              expr => assert(expr.toString().startsWith("count("))
+            }
+            planExecs(1).aggregateExpressions.foreach {
+              expr => assert(expr.toString().startsWith("partial_count("))
+            }
+          }
+      }
+    )
+
+    val sql1 =
+      """
+        |select count(distinct a, b, c)
+        |from
+        |values (0, null, 1), (1, 1, 1), (null, 2, 1), (1, 2, 1) ,(2, 2, null)
+        |as data(a,b,c)
+        |group by c
+        |""".stripMargin
+
+    compareResultsAgainstVanillaSpark(
+      sql1,
+      true,
+      {
+        df =>
+          {
+
+            val planExecs = df.queryExecution.executedPlan.collect {
+              case aggTransformer: HashAggregateExecBaseTransformer => 
aggTransformer
+            }
+
+            planExecs.head.aggregateExpressions.foreach {
+              expr => assert(expr.toString().startsWith("countdistinct("))
+            }
+            planExecs(1).aggregateExpressions.foreach {
+              expr => 
assert(expr.toString().startsWith("partial_countdistinct("))
+            }
+          }
+      }
+    )
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
index 43cc68ead..82051baee 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
@@ -36,7 +36,11 @@ object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
       GlutenConfig.getConf.enableGluten && 
GlutenConfig.getConf.enableCountDistinctWithoutExpand
     ) {
       
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION))
 {
-        case ae: AggregateExpression if ae.isDistinct && 
ae.aggregateFunction.isInstanceOf[Count] =>
+        case ae: AggregateExpression
+            if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
+              // The maximum number of arguments for aggregate function with 
Nullable types in CH
+              // backend is 8
+              ae.aggregateFunction.children.size <= 8 =>
           ae.copy(
             aggregateFunction =
               
CountDistinct.apply(ae.aggregateFunction.asInstanceOf[Count].children),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to