This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dacb848f84f [FLINK-34505][table] Migrate WindowGroupReorderRule to java
dacb848f84f is described below
commit dacb848f84f0993afb2fc809ed87be8463d42cc3
Author: Jacky Lau <[email protected]>
AuthorDate: Fri Dec 27 06:13:55 2024 +0800
[FLINK-34505][table] Migrate WindowGroupReorderRule to java
---
.../plan/rules/logical/WindowGroupReorderRule.java | 193 +++++++++++++++++++++
.../rules/logical/WindowGroupReorderRule.scala | 138 ---------------
2 files changed, 193 insertions(+), 138 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.java
new file mode 100644
index 00000000000..b80bcc7c5f6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Planner rule that makes the over window groups which have the same shuffle
keys and order keys
+ * together.
+ */
[email protected]
+public class WindowGroupReorderRule
+ extends RelRule<WindowGroupReorderRule.WindowGroupReorderRuleConfig> {
+
+ public static final WindowGroupReorderRule INSTANCE =
+
WindowGroupReorderRule.WindowGroupReorderRuleConfig.DEFAULT.toRule();
+
+ private WindowGroupReorderRule(WindowGroupReorderRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalWindow window = call.rel(0);
+ return window.groups.size() > 1;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWindow window = call.rel(0);
+ RelNode input = call.rel(1);
+ List<Group> oldGroups = new ArrayList<>(window.groups);
+ List<Group> sequenceGroups = new ArrayList<>(window.groups);
+
+ sequenceGroups.sort(
+ (o1, o2) -> {
+ int keyComp = o1.keys.compareTo(o2.keys);
+ if (keyComp == 0) {
+ return compareRelCollation(o1.orderKeys, o2.orderKeys);
+ } else {
+ return keyComp;
+ }
+ });
+
+ List<Group> reverseSequenceGroups = new ArrayList<>(window.groups);
+ Collections.reverse(reverseSequenceGroups);
+ if (!sequenceGroups.equals(oldGroups) &&
!reverseSequenceGroups.equals(oldGroups)) {
+ int offset = input.getRowType().getFieldCount();
+ List<int[]> aggTypeIndexes = new ArrayList<>();
+ for (Group group : oldGroups) {
+ int aggCount = group.aggCalls.size();
+ int[] typeIndexes = new int[aggCount];
+ for (int i = 0; i < aggCount; i++) {
+ typeIndexes[i] = offset + i;
+ }
+ offset += aggCount;
+ aggTypeIndexes.add(typeIndexes);
+ }
+
+ offset = input.getRowType().getFieldCount();
+ List<Integer> mapToOldTypeIndexes =
+ IntStream.range(0,
offset).boxed().collect(Collectors.toList());
+ for (Group newGroup : sequenceGroups) {
+ int aggCount = newGroup.aggCalls.size();
+ int oldIndex = oldGroups.indexOf(newGroup);
+ offset += aggCount;
+ for (int aggIndex = 0; aggIndex < aggCount; aggIndex++) {
+
mapToOldTypeIndexes.add(aggTypeIndexes.get(oldIndex)[aggIndex]);
+ }
+ }
+
+ List<Map.Entry<String, RelDataType>> newFieldList =
+ mapToOldTypeIndexes.stream()
+ .map(index ->
window.getRowType().getFieldList().get(index))
+ .collect(Collectors.toList());
+ RelDataType intermediateRowType =
+
window.getCluster().getTypeFactory().createStructType(newFieldList);
+ LogicalWindow newLogicalWindow =
+ LogicalWindow.create(
+ window.getCluster().getPlanner().emptyTraitSet(),
+ input,
+ window.constants,
+ intermediateRowType,
+ sequenceGroups);
+
+ List<Integer> sortedIndices =
+ IntStream.range(0, mapToOldTypeIndexes.size())
+ .boxed()
+
.sorted(Comparator.comparingInt(mapToOldTypeIndexes::get))
+ .collect(Collectors.toList());
+
+ List<RexInputRef> projects =
+ sortedIndices.stream()
+ .map(
+ index ->
+ new RexInputRef(
+ index,
newFieldList.get(index).getValue()))
+ .collect(Collectors.toList());
+
+ LogicalProject project =
+ LogicalProject.create(
+ newLogicalWindow,
+ Collections.emptyList(),
+ projects,
+ window.getRowType());
+ call.transformTo(project);
+ }
+ }
+
+ private int compareRelCollation(RelCollation o1, RelCollation o2) {
+ int comp = o1.compareTo(o2);
+ if (comp == 0) {
+ List<RelFieldCollation> collations1 = o1.getFieldCollations();
+ List<RelFieldCollation> collations2 = o2.getFieldCollations();
+ for (int index = 0; index < collations1.size(); index++) {
+ RelFieldCollation collation1 = collations1.get(index);
+ RelFieldCollation collation2 = collations2.get(index);
+ int direction =
+ collation1
+ .getDirection()
+ .shortString
+
.compareTo(collation2.getDirection().shortString);
+ if (direction == 0) {
+ int nullDirection =
+ Integer.compare(
+ collation1.nullDirection.nullComparison,
+ collation2.nullDirection.nullComparison);
+ if (nullDirection != 0) {
+ return nullDirection;
+ }
+ } else {
+ return direction;
+ }
+ }
+ }
+ return comp;
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface WindowGroupReorderRuleConfig extends RelRule.Config {
+ WindowGroupReorderRule.WindowGroupReorderRuleConfig DEFAULT =
+
ImmutableWindowGroupReorderRule.WindowGroupReorderRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(LogicalWindow.class)
+ .inputs(
+ b1 ->
+
b1.operand(RelNode.class)
+
.anyInputs()))
+ .withDescription("ExchangeWindowGroupRule");
+
+ @Override
+ default WindowGroupReorderRule toRule() {
+ return new WindowGroupReorderRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.scala
deleted file mode 100644
index e9464c08629..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRule.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelCollation, RelNode}
-import org.apache.calcite.rel.core.Window.Group
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.{LogicalProject, LogicalWindow}
-import org.apache.calcite.rex.RexInputRef
-
-import java.util
-import java.util.{Collections, Comparator}
-
-import scala.collection.JavaConversions._
-
-/**
- * Planner rule that makes the over window groups which have the same shuffle
keys and order keys
- * together.
- */
-class WindowGroupReorderRule
- extends RelOptRule(
- operand(classOf[LogicalWindow], operand(classOf[RelNode], any)),
- "ExchangeWindowGroupRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val window: LogicalWindow = call.rel(0)
- window.groups.size() > 1
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val window: LogicalWindow = call.rel(0)
- val input: RelNode = call.rel(1)
- val oldGroups: util.List[Group] = new util.ArrayList(window.groups)
- val sequenceGroups: util.List[Group] = new util.ArrayList(window.groups)
-
- sequenceGroups.sort(new Comparator[Group] {
- override def compare(o1: Group, o2: Group): Int = {
- val keyComp = o1.keys.compareTo(o2.keys)
- if (keyComp == 0) {
- compareRelCollation(o1.orderKeys, o2.orderKeys)
- } else {
- keyComp
- }
- }
- })
-
- if (!sequenceGroups.equals(oldGroups) &&
!sequenceGroups.reverse.equals(oldGroups)) {
- var offset = input.getRowType.getFieldCount
- val aggTypeIndexes = oldGroups.map {
- group =>
- val aggCount = group.aggCalls.size()
- val typeIndexes = (0 until aggCount).map(_ + offset).toArray
- offset += aggCount
- typeIndexes
- }
-
- offset = input.getRowType.getFieldCount
- val mapToOldTypeIndexes = (0 until offset).toArray ++
- sequenceGroups
- .flatMap {
- newGroup =>
- val aggCount = newGroup.aggCalls.size()
- val oldIndex = oldGroups.indexOf(newGroup)
- offset += aggCount
- (0 until aggCount).map(aggIndex =>
aggTypeIndexes(oldIndex)(aggIndex))
- }
- .toArray[Int]
-
- val oldRowTypeFields = window.getRowType.getFieldList
- val newFieldList = new util.ArrayList[util.Map.Entry[String,
RelDataType]]
- mapToOldTypeIndexes.foreach(index =>
newFieldList.add(oldRowTypeFields.get(index)))
- val intermediateRowType =
window.getCluster.getTypeFactory.createStructType(newFieldList)
- val newLogicalWindow = LogicalWindow.create(
- window.getCluster.getPlanner.emptyTraitSet(),
- input,
- window.constants,
- intermediateRowType,
- sequenceGroups)
-
- val mapToNewTypeIndexes = mapToOldTypeIndexes.zipWithIndex.sortBy(_._1)
-
- val projects = mapToNewTypeIndexes.map {
- index => new RexInputRef(index._2, newFieldList.get(index._2).getValue)
- }
- val project = LogicalProject.create(
- newLogicalWindow,
- Collections.emptyList[RelHint](),
- projects.toList,
- window.getRowType)
- call.transformTo(project)
- }
- }
-
- private def compareRelCollation(o1: RelCollation, o2: RelCollation): Int = {
- val comp = o1.compareTo(o2)
- if (comp == 0) {
- val collations1 = o1.getFieldCollations
- val collations2 = o2.getFieldCollations
- for (index <- 0 until collations1.length) {
- val collation1 = collations1(index)
- val collation2 = collations2(index)
- val direction =
collation1.direction.shortString.compareTo(collation2.direction.shortString)
- if (direction == 0) {
- val nullDirection =
-
collation1.nullDirection.nullComparison.compare(collation2.nullDirection.nullComparison)
- if (nullDirection != 0) {
- return nullDirection
- }
- } else {
- return direction
- }
- }
- }
- comp
- }
-}
-
-object WindowGroupReorderRule {
- val INSTANCE = new WindowGroupReorderRule
-}