This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c3238c441 DRILL-8545: Disable HashAgg for collect_to_list_varchar due 
to ordering requirements (#3042)
3c3238c441 is described below

commit 3c3238c44159f1e9960eb1cbeb5847036bdb87c9
Author: Maksym Rymar <[email protected]>
AuthorDate: Tue Mar 31 00:36:10 2026 +0300

    DRILL-8545: Disable HashAgg for collect_to_list_varchar due to ordering 
requirements (#3042)
---
 ...tion.java => CollectToListVarcharFunction.java} | 59 ++++++++++++----------
 .../drill/exec/planner/physical/HashAggPrule.java  | 37 ++++++++++++++
 .../drill/exec/fn/impl/TestAggregateFunctions.java | 30 +++++++----
 3 files changed, 89 insertions(+), 37 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
similarity index 55%
rename from 
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
rename to 
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
index 7a42aeeae1..4e2791924c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
@@ -29,40 +29,47 @@ import 
org.apache.drill.exec.vector.complex.writer.BaseWriter;
 /**
  * Aggregate function which collects incoming VarChar column values into the 
list.
  */
-@FunctionTemplate(name = "collect_to_list_varchar",
-                  scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
-                  isInternal = true)
-public class CollectToListVarcharAggFunction implements DrillAggFunc {
+public class CollectToListVarcharFunction {
+  public static final String NAME = "collect_to_list_varchar";
 
-  @Param NullableVarCharHolder input;
-  @Output BaseWriter.ComplexWriter writer;
-  @Workspace ObjectHolder writerHolder;
+  @FunctionTemplate(name = NAME,
+      scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
+      isInternal = true)
+  public static class CollectToListVarcharAggFunction implements DrillAggFunc {
 
-  @Override
-  public void setup() {
-    writerHolder = new ObjectHolder();
-  }
+    @Param NullableVarCharHolder input;
+    @Output BaseWriter.ComplexWriter writer;
+    @Workspace ObjectHolder writerHolder;
+
+    private CollectToListVarcharAggFunction() {
+    }
 
-  @Override
-  public void add() {
-    org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter 
listWriter;
-    if (writerHolder.obj == null) {
-      writerHolder.obj = writer.rootAsList();
+    @Override
+    public void setup() {
+      writerHolder = new ObjectHolder();
     }
 
-    listWriter = 
(org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) 
writerHolder.obj;
+    @Override
+    public void add() {
+      org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter 
listWriter;
+      if (writerHolder.obj == null) {
+        writerHolder.obj = writer.rootAsList();
+      }
+
+      listWriter = 
(org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) 
writerHolder.obj;
 
-    if (input.isSet > 0) {
-      listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
+      if (input.isSet > 0) {
+        listWriter.varChar().writeVarChar(input.start, input.end, 
input.buffer);
+      }
     }
-  }
 
-  @Override
-  public void output() {
-  }
+    @Override
+    public void output() {
+    }
 
-  @Override
-  public void reset() {
-    writerHolder.obj = null;
+    @Override
+    public void reset() {
+      writerHolder.obj = null;
+    }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 1b3805e5bd..da26e1a27c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -19,11 +19,13 @@ package org.apache.drill.exec.planner.physical;
 
 import com.google.common.collect.Lists;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTrait;
@@ -58,6 +60,10 @@ public class HashAggPrule extends AggPruleBase {
     final DrillAggregateRel aggregate = call.rel(0);
     final RelNode input = call.rel(1);
 
+    if (hasIncompatibleAggCalls(aggregate)) {
+      return;
+    }
+
     if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
       // currently, don't use HashAggregate if any of the logical aggrs 
contains DISTINCT or
       // if there are no grouping keys
@@ -168,4 +174,35 @@ public class HashAggPrule extends AggPruleBase {
     call.transformTo(newAgg);
   }
 
+  /**
+   * Evaluates the logical aggregate expressions to determine if any are
+   * incompatible with the Hash Aggregate physical operator.
+   * <p>
+   * While Hash Aggregate is generally performant for many aggregation types,
+   * certain functions (such as {@code collect_to_list_varchar}) may require 
specific
+   * data ordering or memory management patterns that the Hash Aggregate
+   * implementation does not provide.
+   * </p>
+   * <p>
+   * <b>Current Incompatibilities:</b>
+   * <ul>
+   * <li>{@link 
org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}: Excluded from
+   * HashAgg because it requires data ordering and cannot be processed 
efficiently in an unordered
+   * fashion. {@code SortAggPrule} is intended to handle this, ensuring 
deterministic results.
+   * </li>
+   * </ul>
+   * </p>
+   * @param aggregate The logical aggregate relational nodes containing the
+   * list of {@link AggregateCall}s to inspect.
+   * @return {@code true} if at least one aggregation call is incompatible
+   * with HashAgg; {@code false} otherwise.
+   */
+  private boolean hasIncompatibleAggCalls(DrillAggregateRel aggregate) {
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+      if 
(CollectToListVarcharFunction.NAME.equalsIgnoreCase(aggCall.getAggregation().getName()))
 {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index f8fa2221ea..f052ed2d8f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1101,29 +1101,37 @@ public class TestAggregateFunctions extends ClusterTest 
{
   public void testCollectToListVarcharStreamAgg() throws Exception {
     try {
       client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+      client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), true);
       testBuilder()
-          .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
-              "(select * from cp.`store/json/clicks.json` limit 2)")
+          .sqlQuery("select collect_to_list_varchar(`clicks`.`trans_id`) as 
ids from " +
+              "cp.`store/json/clicks.json` clicks group by 
`clicks`.`user_info`.`device`")
           .unOrdered()
-          .baselineColumns("l")
-          .baselineValues(listOf("2014-04-26", "2014-04-20"))
+          .baselineColumns("ids")
+          .baselineValues(listOf("31920", "32383", "32359"))
+          .baselineValues(listOf("31026"))
+          .baselineValues(listOf("33848"))
           .go();
     } finally {
       client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+      client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
     }
   }
 
+  /**
+   * The current implementation of {@link 
org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}
+   * requires ordered input data. Because the Hash Aggregator does not 
maintain input order,
+   * it looks like there is no efficient way to process these values correctly 
within that operator
+   * by the function. {@code SortAggPrule} is intended to handle this to 
ensure deterministic
+   * results.
+   */
   @Test
   public void testCollectToListVarcharHashAgg() throws Exception {
     try {
+      thrown.expect(UserRemoteException.class);
+      thrown.expectMessage(containsString("SYSTEM ERROR: 
CannotPlanException"));
       client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
-      testBuilder()
-          .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
-              "(select * from cp.`store/json/clicks.json` limit 2) group by 
'a'")
-          .unOrdered()
-          .baselineColumns("l")
-          .baselineValues(listOf("2014-04-26", "2014-04-20"))
-          .go();
+      run("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from" +
+        " cp.`store/json/clicks.json` clicks group by 
`clicks`.`user_info`.`device`");
     } finally {
       client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
     }

Reply via email to