zstan commented on code in PR #12815:
URL: https://github.com/apache/ignite/pull/12815#discussion_r2888109870


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteLogicalWindowRewriteRule.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.immutables.value.Value;
+
+/**
+ * Rule that rewrites LogicalWindow to LogicalAggregate LogicalJoin 
LogicalProject.
+ * This approach is valid only for unbounded frame.
+ */
[email protected]
+public class IgniteLogicalWindowRewriteRule extends 
RelRule<IgniteLogicalWindowRewriteRule.Config> {
+    /** Rule instance. */
+    public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
+
+    /**
+     * Constructor.
+     *
+     * @param config Rule configuration.
+     */
+    private IgniteLogicalWindowRewriteRule(Config config) {
+        super(config);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMatch(RelOptRuleCall call) {
+        LogicalWindow win = call.rel(0);
+
+        if (win.groups.size() > 1) {
+            RelNode input = win.getInput();
+            RelDataTypeFactory typeFactory = win.getCluster().getTypeFactory();
+
+            for (LogicalWindow.Group grp : win.groups) {
+                RelDataType joinRowType = buildWindowRowType(typeFactory, 
input, grp);
+
+                LogicalWindow single = LogicalWindow.create(
+                    input.getTraitSet(),
+                    input,
+                    win.getConstants(),
+                    joinRowType,
+                    List.of(grp)
+                );
+
+                input = single;
+            }
+
+            call.transformTo(input);
+
+            return;
+        }
+
+        LogicalWindow.Group grp = win.groups.get(0);
+
+        validateSupported(grp);
+
+        RelNode input = win.getInput();
+        RexBuilder rexBuilder = win.getCluster().getRexBuilder();
+        RelDataTypeFactory typeFactory = win.getCluster().getTypeFactory();
+        RelNode aggInput = appendConstants(input, win.getConstants());
+
+        ImmutableBitSet grpSet = grp.keys;
+
+        List<AggregateCall> aggCalls = new ArrayList<>(grp.aggCalls.size());
+
+        for (Window.RexWinAggCall winAggCall : grp.aggCalls) {
+            aggCalls.add(toAggregateCall(winAggCall, typeFactory, 
grpSet.cardinality()));
+        }
+
+        RelNode agg = LogicalAggregate.create(
+            aggInput,
+            grpSet,
+            null,
+            aggCalls
+        );
+
+        RexNode condition = buildPartitionJoinCondition(rexBuilder, 
typeFactory, input, agg, grpSet);
+
+        RelNode join = LogicalJoin.create(
+            input,
+            agg,
+            Collections.emptyList(),
+            condition,
+            Collections.emptySet(),
+            JoinRelType.INNER
+        );
+
+        RelNode project = LogicalProject.create(
+            join,
+            List.of(),
+            buildProjection(join, win, grp),
+            win.getRowType().getFieldNames(),
+            ImmutableSet.of()
+        );
+
+        call.transformTo(project);
+    }
+
+    /**
+     * Appends LogicalWindow constants to input as additional projection 
columns.
+     *
+     * @param input Input relation.
+     * @param constants Window constants.
+     * @return Input relation augmented with constants.
+     */
+    private static RelNode appendConstants(RelNode input, List<RexLiteral> 
constants) {
+        if (constants.isEmpty())
+            return input;
+
+        RexBuilder rexBuilder = input.getCluster().getRexBuilder();
+        int inputFieldCnt = input.getRowType().getFieldCount();
+
+        List<RexNode> projects = new ArrayList<>(inputFieldCnt + 
constants.size());
+        List<String> names = new 
ArrayList<>(input.getRowType().getFieldNames());
+
+        for (int i = 0; i < inputFieldCnt; i++)
+            projects.add(rexBuilder.makeInputRef(input, i));
+
+        projects.addAll(constants);
+
+        for (int i = 0; i < constants.size(); i++)
+            names.add("_w_const$" + i);
+
+        return LogicalProject.create(input, List.of(), projects, names, 
ImmutableSet.of());
+    }
+
+    /**
+     * Builds a row type for a window with aggregate calls.
+     *
+     * @param typeFactory Type factory.
+     * @param input Input relation.
+     * @param grp Window group.
+     * @return A row type combining the input fields and windowed aggregate 
results.
+     */
+    private static RelDataType buildWindowRowType(
+        RelDataTypeFactory typeFactory,
+        RelNode input,
+        LogicalWindow.Group grp
+    ) {
+        RelDataTypeFactory.Builder builder = typeFactory.builder();
+
+        builder.addAll(input.getRowType().getFieldList());
+
+        for (int i = 0; i < grp.aggCalls.size(); i++) {
+            Window.RexWinAggCall winAggCall = grp.aggCalls.get(i);
+
+            String name = "agg$" + i;
+
+            RelDataType type = winAggCall.getType();
+
+            builder.add(name, type);
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Builds a join condition between input and aggregate results using 
partition keys.
+     * Returns TRUE for an empty partition set (cross join).
+     *
+     * @param rexBuilder Rex builder.
+     * @param typeFactory Type factory.
+     * @param input Input relation.
+     * @param agg Aggregate relation.
+     * @param groupSet Partition keys.
+     * @return Join a condition expression.
+     */
+    private static RexNode buildPartitionJoinCondition(
+        RexBuilder rexBuilder,
+        RelDataTypeFactory typeFactory,
+        RelNode input,
+        RelNode agg,
+        ImmutableBitSet groupSet
+    ) {
+        if (groupSet.isEmpty())
+            return rexBuilder.makeLiteral(true);
+
+        int inputFieldCnt = input.getRowType().getFieldCount();
+        List<Integer> keys = groupSet.asList();
+
+        RelDataType joinRowType = typeFactory.builder()
+            .addAll(input.getRowType().getFieldList())
+            .addAll(agg.getRowType().getFieldList())
+            .build();
+
+        List<RexNode> conditions = new ArrayList<>(keys.size());
+
+        for (int i = 0; i < keys.size(); i++) {
+            int keyIdx = keys.get(i);
+
+            RexNode left = rexBuilder.makeInputRef(joinRowType, keyIdx);
+            RexNode right = rexBuilder.makeInputRef(joinRowType, inputFieldCnt 
+ i);

Review Comment:
   ```suggestion
               RexNode left = rexBuilder.makeInputRef(input.getRowType(), 
keyIdx);
               RexNode right = rexBuilder.makeInputRef(agg.getRowType(), 
inputFieldCnt + i);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to