This is an automated email from the ASF dual-hosted git repository.
Mryange pushed a commit to branch groupjoin
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/groupjoin by this push:
new 061f8c64c0a thrift
061f8c64c0a is described below
commit 061f8c64c0ac41a118b87a8cdb93fb226c7cac89
Author: Mryange <[email protected]>
AuthorDate: Mon Jun 15 10:42:12 2026 +0800
thrift
---
gensrc/thrift/PlanNodes.thrift | 61 ++++++++++++++++++++++++++++++++++++++++--
1 file changed, 59 insertions(+), 2 deletions(-)
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ddec6683318..11aaf4d113a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -63,7 +63,8 @@ enum TPlanNodeType {
MATERIALIZATION_NODE = 34,
REC_CTE_NODE = 35,
REC_CTE_SCAN_NODE = 36,
- BUCKETED_AGGREGATION_NODE = 37
+ BUCKETED_AGGREGATION_NODE = 37,
+ GROUP_JOIN_NODE = 38
}
struct TKeyRange {
@@ -1186,6 +1187,61 @@ struct TAggregationNode {
10: optional TSortInfo agg_sort_info_by_group_key
}
+enum TGroupJoinAggSide {
+ BUILD = 0,
+ PROBE = 1,
+}
+
+enum TGroupJoinAggOutputMode {
+ // 直接输出最终聚合结果。这是 GroupJoin 的主路径:
+ // hash join 已经按照 join/group key 做过数据分布,所以不需要下游 global agg merge。
+ // 目前第一版只支持FINAL_RESULT?
+ FINAL_RESULT = 0,
+ // 输出序列化的中间聚合状态。这个模式作为 fallback 保留:
+ // 当 GroupJoin 不能证明当前 instance 拥有完整 group 时,仍然可以交给下游 global agg merge。
+ SERIALIZED_STATE = 1,
+}
+
+struct TGroupJoinAggFunction {
+ // FE 生成的聚合函数表达式。这个表达式必须能在 join 行物化前,只基于某一侧 child 执行。
+ 1: required Exprs.TExpr aggregate_function
+ // 聚合函数输入列来自哪一侧 child。
+ // BUILD 表示在消费 build child 时更新 state,drain 时按 probe 行数做 repeat。
+ // PROBE 表示在消费 probe child 时更新 state,drain 时按 build 行数做 repeat。
+ 2: required TGroupJoinAggSide input_side
+ // 当前 demo 版本不支持带 ORDER BY 的顺序敏感聚合函数,例如 group_concat(v order by t)。
+ // 因为 GroupJoin 只保存单侧局部 state 和另一侧 row count,无法恢复 join 后完整行序。
+}
+
+struct TGroupJoinNode {
+ // Join 信息。
+ // 被融合的 join 类型。当前 BE 初版只支持 INNER_JOIN。
+ 1: required TJoinOp join_op
+ // 等值 join 条件。当前 BE 初版里它们同时也是 group key 的来源:
+ // GROUP BY 表达式必须等价于这些等值条件中的某一侧。
+ 2: required list<TEqJoinCondition> eq_join_conjuncts
+ // FE 选择的 join 分布方式。GroupJoin 复用 hash join 的分布规则,
+ // 要求相同 join/group key 的行被分发到同一个 BE instance。
+ 3: optional TJoinDistributionType dist_type
+
+ // Aggregation 信息。字段号从 20 开始,和 join 信息分段。
+ // 被融合聚合的 group-by 表达式。它们描述输出 key 列,
+ // 并且必须和 eq_join_conjuncts 中的 join key 等价。
+ 20: required list<Exprs.TExpr> grouping_exprs
+ // 被融合进 GroupJoin 的聚合函数。每个 item 自己携带 input side,
+ // BE 不需要维护 aggregate function list 和 side list 的下标对齐关系。
+ 21: required list<TGroupJoinAggFunction> aggregate_functions
+ // drain 阶段如何输出聚合列。FINAL_RESULT 是 inner partitioned GroupJoin 的主路径,
+ // 因为 hash join shuffle 已经保证每个 group key 在一个 BE instance 内是完整的。
+ 22: required TGroupJoinAggOutputMode agg_output_mode
+ // 当前 GroupJoin 节点实际输出行的 tuple descriptor。agg_output_mode == FINAL_RESULT 时,
+ // 聚合 slot 使用最终结果类型;agg_output_mode == SERIALIZED_STATE 时,
+ // 聚合 slot 使用序列化 state 类型。
+ 23: required Types.TTupleId output_tuple_id
+ // GroupJoin 产生的 runtime filter 描述复用 TPlanNode.runtime_filters 字段。
+ // FE 应该把 build 侧 join key 作为 src_expr,把可下推到 probe 侧 scan 的表达式作为 target expr。
+}
+
struct TBucketedAggregationNode {
1: optional list<Exprs.TExpr> grouping_exprs
2: optional list<Exprs.TExpr> aggregate_functions
@@ -1644,7 +1700,7 @@ struct TPlanNode {
33: optional TIntersectNode intersect_node
34: optional TExceptNode except_node
35: optional TOdbcScanNode odbc_scan_node
- // Runtime filters assigned to this plan node, exist in HashJoinNode and
ScanNode
+ // Runtime filters assigned to this plan node, exist in HashJoinNode,
GroupJoinNode and ScanNode
36: optional list<TRuntimeFilterDesc> runtime_filters
37: optional TGroupCommitScanNode group_commit_scan_node
38: optional TMaterializationNode materialization_node
@@ -1673,6 +1729,7 @@ struct TPlanNode {
51: optional bool is_serial_operator
52: optional TRecCTEScanNode rec_cte_scan_node
53: optional TBucketedAggregationNode bucketed_agg_node
+ 54: optional TGroupJoinNode group_join_node
// projections is final projections, which means projecting into results and
materializing them into the output block.
101: optional list<Exprs.TExpr> projections
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]