This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3238c441 DRILL-8545: Disable HashAgg for collect_to_list_varchar due
to ordering requirements (#3042)
3c3238c441 is described below
commit 3c3238c44159f1e9960eb1cbeb5847036bdb87c9
Author: Maksym Rymar <[email protected]>
AuthorDate: Tue Mar 31 00:36:10 2026 +0300
DRILL-8545: Disable HashAgg for collect_to_list_varchar due to ordering
requirements (#3042)
---
...tion.java => CollectToListVarcharFunction.java} | 59 ++++++++++++----------
.../drill/exec/planner/physical/HashAggPrule.java | 37 ++++++++++++++
.../drill/exec/fn/impl/TestAggregateFunctions.java | 30 +++++++----
3 files changed, 89 insertions(+), 37 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
similarity index 55%
rename from
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
rename to
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
index 7a42aeeae1..4e2791924c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java
@@ -29,40 +29,47 @@ import
org.apache.drill.exec.vector.complex.writer.BaseWriter;
/**
* Aggregate function which collects incoming VarChar column values into the
list.
*/
-@FunctionTemplate(name = "collect_to_list_varchar",
- scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
- isInternal = true)
-public class CollectToListVarcharAggFunction implements DrillAggFunc {
+public class CollectToListVarcharFunction {
+ public static final String NAME = "collect_to_list_varchar";
- @Param NullableVarCharHolder input;
- @Output BaseWriter.ComplexWriter writer;
- @Workspace ObjectHolder writerHolder;
+ @FunctionTemplate(name = NAME,
+ scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
+ isInternal = true)
+ public static class CollectToListVarcharAggFunction implements DrillAggFunc {
- @Override
- public void setup() {
- writerHolder = new ObjectHolder();
- }
+ @Param NullableVarCharHolder input;
+ @Output BaseWriter.ComplexWriter writer;
+ @Workspace ObjectHolder writerHolder;
+
+ private CollectToListVarcharAggFunction() {
+ }
- @Override
- public void add() {
- org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter
listWriter;
- if (writerHolder.obj == null) {
- writerHolder.obj = writer.rootAsList();
+ @Override
+ public void setup() {
+ writerHolder = new ObjectHolder();
}
- listWriter =
(org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter)
writerHolder.obj;
+ @Override
+ public void add() {
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter
listWriter;
+ if (writerHolder.obj == null) {
+ writerHolder.obj = writer.rootAsList();
+ }
+
+ listWriter =
(org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter)
writerHolder.obj;
- if (input.isSet > 0) {
- listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
+ if (input.isSet > 0) {
+ listWriter.varChar().writeVarChar(input.start, input.end,
input.buffer);
+ }
}
- }
- @Override
- public void output() {
- }
+ @Override
+ public void output() {
+ }
- @Override
- public void reset() {
- writerHolder.obj = null;
+ @Override
+ public void reset() {
+ writerHolder.obj = null;
+ }
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 1b3805e5bd..da26e1a27c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -19,11 +19,13 @@ package org.apache.drill.exec.planner.physical;
import com.google.common.collect.Lists;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTrait;
@@ -58,6 +60,10 @@ public class HashAggPrule extends AggPruleBase {
final DrillAggregateRel aggregate = call.rel(0);
final RelNode input = call.rel(1);
+ if (hasIncompatibleAggCalls(aggregate)) {
+ return;
+ }
+
if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
// currently, don't use HashAggregate if any of the logical aggrs
contains DISTINCT or
// if there are no grouping keys
@@ -168,4 +174,35 @@ public class HashAggPrule extends AggPruleBase {
call.transformTo(newAgg);
}
+ /**
+ * Evaluates the logical aggregate expressions to determine if any are
+ * incompatible with the Hash Aggregate physical operator.
+ * <p>
+ * While Hash Aggregate is generally performant for many aggregation types,
+ * certain functions (such as {@code collect_to_list_varchar}) may require
specific
+ * data ordering or memory management patterns that the Hash Aggregate
+ * implementation does not provide.
+ * </p>
+ * <p>
+ * <b>Current Incompatibilities:</b>
+ * <ul>
+ * <li>{@link
org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}: Excluded from
+ * HashAgg because it requires data ordering and cannot be processed
efficiently in an unordered
+ * fashion. {@code SortAggPrule} is intended to handle this, ensuring
deterministic results.
+ * </li>
+ * </ul>
+ * </p>
+ * @param aggregate The logical aggregate relational nodes containing the
+ * list of {@link AggregateCall}s to inspect.
+ * @return {@code true} if at least one aggregation call is incompatible
+ * with HashAgg; {@code false} otherwise.
+ */
+ private boolean hasIncompatibleAggCalls(DrillAggregateRel aggregate) {
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ if
(CollectToListVarcharFunction.NAME.equalsIgnoreCase(aggCall.getAggregation().getName()))
{
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index f8fa2221ea..f052ed2d8f 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1101,29 +1101,37 @@ public class TestAggregateFunctions extends ClusterTest
{
public void testCollectToListVarcharStreamAgg() throws Exception {
try {
client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), true);
testBuilder()
- .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
- "(select * from cp.`store/json/clicks.json` limit 2)")
+ .sqlQuery("select collect_to_list_varchar(`clicks`.`trans_id`) as
ids from " +
+ "cp.`store/json/clicks.json` clicks group by
`clicks`.`user_info`.`device`")
.unOrdered()
- .baselineColumns("l")
- .baselineValues(listOf("2014-04-26", "2014-04-20"))
+ .baselineColumns("ids")
+ .baselineValues(listOf("31920", "32383", "32359"))
+ .baselineValues(listOf("31026"))
+ .baselineValues(listOf("33848"))
.go();
} finally {
client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
}
}
+ /**
+ * The current implementation of {@link
org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}
+ * requires ordered input data. Because the Hash Aggregator does not
maintain input order,
+ * it looks like there is no efficient way to process these values correctly
within that operator
+ * by the function. {@code SortAggPrule} is intended to handle this to
ensure deterministic
+ * results.
+ */
@Test
public void testCollectToListVarcharHashAgg() throws Exception {
try {
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(containsString("SYSTEM ERROR:
CannotPlanException"));
client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
- testBuilder()
- .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
- "(select * from cp.`store/json/clicks.json` limit 2) group by
'a'")
- .unOrdered()
- .baselineColumns("l")
- .baselineValues(listOf("2014-04-26", "2014-04-20"))
- .go();
+ run("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from" +
+ " cp.`store/json/clicks.json` clicks group by
`clicks`.`user_info`.`device`");
} finally {
client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
}