godfreyhe commented on code in PR #21219: URL: https://github.com/apache/flink/pull/21219#discussion_r1052849162
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java: ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil; + +import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +/** + * Traverses an event time temporal table join {@link RelNode} tree and update the right child to + * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent + * it generating a new StreamPhysicalChangelogNormalize later. + * + * <p>above is the match patterns(8 variants, the three `Calc` nodes are all optional): + * + * <pre>{@code + * Join (event time temporal) + * / \ + * RelNode [Calc] + * \ + * Snapshot + * \ + * [Calc] + * \ + * WatermarkAssigner + * \ + * [Calc] + * \ + * TableScan + * }</pre> + * + * <p>Note: This rule can only be used in a separate {@link org.apache.calcite.plan.hep.HepProgram} + * after `LOGICAL_REWRITE` rule sets are applied for now. + */ +public class EventTimeTemporalJoinRewriteRule + extends RelRule<EventTimeTemporalJoinRewriteRule.Config> { + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_TS = Config.JOIN_SNAPSHOT_WMA_TS.toRule(); + Review Comment: It's better we can define them in a `RuleSet`, and FlinkStreamRuleSets just needs to refer the `RuleSet` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java: ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil; + +import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +/** + * Traverses an event time temporal table join {@link RelNode} tree and update the right child to + * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent + * it generating a new StreamPhysicalChangelogNormalize later. + * + * <p>above is the match patterns(8 variants, the three `Calc` nodes are all optional): Review Comment: above -> the following ? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java: ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil; + +import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +/** + * Traverses an event time temporal table join {@link RelNode} tree and update the right child to + * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent + * it generating a new StreamPhysicalChangelogNormalize later. + * + * <p>above is the match patterns(8 variants, the three `Calc` nodes are all optional): + * + * <pre>{@code + * Join (event time temporal) + * / \ + * RelNode [Calc] + * \ + * Snapshot + * \ + * [Calc] + * \ + * WatermarkAssigner + * \ + * [Calc] + * \ + * TableScan + * }</pre> + * + * <p>Note: This rule can only be used in a separate {@link org.apache.calcite.plan.hep.HepProgram} + * after `LOGICAL_REWRITE` rule sets are applied for now. + */ +public class EventTimeTemporalJoinRewriteRule + extends RelRule<EventTimeTemporalJoinRewriteRule.Config> { + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_TS = Config.JOIN_SNAPSHOT_WMA_TS.toRule(); + + public EventTimeTemporalJoinRewriteRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + RexNode joinCondition = join.getCondition(); + // only matches event time temporal join + return joinCondition != null + && TemporalTableJoinUtil.isEventTimeTemporalJoin(joinCondition); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + FlinkLogicalRel joinRightChild = call.rel(2); + RelNode newRight = transmitSnapshotRequirement(joinRightChild); + call.transformTo( + join.copy(join.getTraitSet(), Lists.newArrayList(join.getLeft(), newRight))); + } + + private RelNode transmitSnapshotRequirement(RelNode node) { + if (node instanceof FlinkLogicalCalc) { + final FlinkLogicalCalc calc = (FlinkLogicalCalc) node; + // filter is not allowed because it will corrupt the version table + if (null != calc.getProgram().getCondition()) { + throw new TableException( + "Filter is not allowed for right changelog input of event time temporal join," + + " it will corrupt the versioning of data. Please consider removing the filter before joining."); + } + + final RelNode child = calc.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return calc.copy(calc.getTraitSet(), newChild, calc.getProgram()); + } + return calc; + } + if (node instanceof FlinkLogicalSnapshot) { + final FlinkLogicalSnapshot snapshot = (FlinkLogicalSnapshot) node; + assert isEventTime(snapshot.getPeriod().getType()); + final RelNode child = snapshot.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return snapshot.copy(snapshot.getTraitSet(), newChild, snapshot.getPeriod()); + } + return snapshot; + } + if (node instanceof HepRelVertex) { + return transmitSnapshotRequirement(((HepRelVertex) node).getCurrentRel()); + } + if (node instanceof FlinkLogicalWatermarkAssigner) { + final FlinkLogicalWatermarkAssigner wma = (FlinkLogicalWatermarkAssigner) node; + final RelNode child = wma.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return wma.copy( + wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr()); + } + return wma; + } + if (node instanceof FlinkLogicalTableSourceScan) { + final FlinkLogicalTableSourceScan ts = (FlinkLogicalTableSourceScan) node; + // update eventTimeSnapshot to true Review Comment: eventTimeSnapshotRequired ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java: ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil; + +import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +/** + * Traverses an event time temporal table join {@link RelNode} tree and update the right child to + * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent + * it generating a new StreamPhysicalChangelogNormalize later. + * + * <p>above is the match patterns(8 variants, the three `Calc` nodes are all optional): + * + * <pre>{@code + * Join (event time temporal) + * / \ + * RelNode [Calc] + * \ + * Snapshot + * \ + * [Calc] + * \ + * WatermarkAssigner + * \ + * [Calc] + * \ + * TableScan + * }</pre> + * + * <p>Note: This rule can only be used in a separate {@link org.apache.calcite.plan.hep.HepProgram} + * after `LOGICAL_REWRITE` rule sets are applied for now. + */ +public class EventTimeTemporalJoinRewriteRule + extends RelRule<EventTimeTemporalJoinRewriteRule.Config> { + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_CALC_SNAPSHOT_WMA_TS = + Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_CALC_WMA_TS = + Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_CALC_TS = + Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(); + + public static final RelOptRule JOIN_SNAPSHOT_WMA_TS = Config.JOIN_SNAPSHOT_WMA_TS.toRule(); + + public EventTimeTemporalJoinRewriteRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + RexNode joinCondition = join.getCondition(); + // only matches event time temporal join + return joinCondition != null + && TemporalTableJoinUtil.isEventTimeTemporalJoin(joinCondition); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + FlinkLogicalRel joinRightChild = call.rel(2); + RelNode newRight = transmitSnapshotRequirement(joinRightChild); + call.transformTo( + join.copy(join.getTraitSet(), Lists.newArrayList(join.getLeft(), newRight))); + } + + private RelNode transmitSnapshotRequirement(RelNode node) { + if (node instanceof FlinkLogicalCalc) { + final FlinkLogicalCalc calc = (FlinkLogicalCalc) node; + // filter is not allowed because it will corrupt the version table + if (null != calc.getProgram().getCondition()) { + throw new TableException( + "Filter is not allowed for right changelog input of event time temporal join," + + " it will corrupt the versioning of data. Please consider removing the filter before joining."); + } + + final RelNode child = calc.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return calc.copy(calc.getTraitSet(), newChild, calc.getProgram()); + } + return calc; + } + if (node instanceof FlinkLogicalSnapshot) { + final FlinkLogicalSnapshot snapshot = (FlinkLogicalSnapshot) node; + assert isEventTime(snapshot.getPeriod().getType()); + final RelNode child = snapshot.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return snapshot.copy(snapshot.getTraitSet(), newChild, snapshot.getPeriod()); + } + return snapshot; + } + if (node instanceof HepRelVertex) { + return transmitSnapshotRequirement(((HepRelVertex) node).getCurrentRel()); + } + if (node instanceof FlinkLogicalWatermarkAssigner) { + final FlinkLogicalWatermarkAssigner wma = (FlinkLogicalWatermarkAssigner) node; + final RelNode child = wma.getInput(); + final RelNode newChild = transmitSnapshotRequirement(child); + if (newChild != child) { + return wma.copy( + wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr()); + } + return wma; + } + if (node instanceof FlinkLogicalTableSourceScan) { + final FlinkLogicalTableSourceScan ts = (FlinkLogicalTableSourceScan) node; + // update eventTimeSnapshot to true + return ts.copy(ts.getTraitSet(), ts.relOptTable(), true); + } + return node; + } + + private boolean isEventTime(RelDataType period) { + if (period instanceof TimeIndicatorRelDataType) { + return ((TimeIndicatorRelDataType) period).isEventTime(); + } + return false; + } + + /** + * Configuration for {@link EventTimeTemporalJoinRewriteRule}. + * + * <p>Operator tree: + * + * <pre>{@code + * Join (event time temporal) + * / \ + * RelNode [Calc] + * \ + * Snapshot + * \ + * [Calc] + * \ + * WatermarkAssigner + * \ + * [Calc] + * \ + * TableScan + * }</pre> + * + * <p>8 variants: + * + * <ul> + * <li>JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS + * <li>JOIN_CALC_SNAPSHOT_CALC_WMA_TS + * <li>JOIN_CALC_SNAPSHOT_WMA_CALC_TS + * <li>JOIN_CALC_SNAPSHOT_WMA_TS + * <li>JOIN_SNAPSHOT_CALC_WMA_CALC_TS + * <li>JOIN_SNAPSHOT_CALC_WMA_TS + * <li>JOIN_SNAPSHOT_WMA_CALC_TS + * <li>JOIN_SNAPSHOT_WMA_TS + * </ul> + */ + public interface Config extends RelRule.Config { + RelRule.Config JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS = + EMPTY.withDescription("EventTimeTemporalJoinRewriteRule") Review Comment: the description should be different! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org