Repository: flink Updated Branches: refs/heads/table-retraction 07a59ae0e -> 5ba0f02a6
[FLINK-6090] [table] Add RetractionRules for annotating AccMode to DataStreamRel nodes. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ba0f02a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ba0f02a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ba0f02a Branch: refs/heads/table-retraction Commit: 5ba0f02a6e574daf206a9ee764153782f6401c97 Parents: 07a59ae Author: hequn.chq <hequn....@alibaba-inc.com> Authored: Fri Apr 7 13:12:04 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Apr 17 21:56:50 2017 +0200 ---------------------------------------------------------------------- .../nodes/datastream/DataStreamCorrelate.scala | 6 +- .../datastream/DataStreamGroupAggregate.scala | 6 + .../DataStreamGroupWindowAggregate.scala | 4 + .../datastream/DataStreamOverAggregate.scala | 4 + .../plan/nodes/datastream/DataStreamRel.scala | 17 + .../datastream/DataStreamRetractionRules.scala | 247 ++++++++++++++ .../nodes/datastream/retractionTraitDefs.scala | 81 +++++ .../nodes/datastream/retractionTraits.scala | 100 ++++++ .../flink/table/plan/rules/FlinkRuleSets.scala | 5 + .../flink/table/CalciteConfigBuilderTest.scala | 31 +- .../table/plan/rules/RetractionRulesTest.scala | 334 +++++++++++++++++++ 11 files changed, 817 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index dd799e6..9ea413a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -36,14 +36,14 @@ import org.apache.flink.types.Row class DataStreamCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, - inputNode: RelNode, + input: RelNode, scan: LogicalTableFunctionScan, condition: Option[RexNode], relRowType: RelDataType, joinRowType: RelDataType, joinType: SemiJoinType, ruleDescription: String) - extends SingleRel(cluster, traitSet, inputNode) + extends SingleRel(cluster, traitSet, input) with CommonCorrelate with DataStreamRel { @@ -84,7 +84,7 @@ class DataStreamCorrelate( val config = tableEnv.getConfig // we do not need to specify input type - val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val funcRel = scan.asInstanceOf[LogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index c2d4fb7..d4aa33c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -57,6 +57,12 @@ class DataStreamGroupAggregate( override def deriveRowType() = rowRelDataType + override def needsUpdatesAsRetraction = true + + override def producesUpdates = true + + override def consumesRetractions = true + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupAggregate( cluster, http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index a0c1dec..c78e8bd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -54,6 +54,10 @@ class DataStreamGroupWindowAggregate( override def deriveRowType(): RelDataType = rowRelDataType + override def needsUpdatesAsRetraction = true + + override def consumesRetractions = true + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupWindowAggregate( window, http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 2224752..031d533 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -51,6 +51,10 @@ class DataStreamOverAggregate( override def deriveRowType(): RelDataType = rowRelDataType + override def needsUpdatesAsRetraction = true + + override def consumesRetractions = true + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { new DataStreamOverAggregate( logicWindow, http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 6f20831..1451a1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -34,5 +34,22 @@ trait DataStreamRel extends RelNode with FlinkRel { */ def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] + /** + * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction + * messages. + */ + def needsUpdatesAsRetraction: Boolean = false + + /** + * Whether the [[DataStreamRel]] produces update and delete changes. + */ + def producesUpdates: Boolean = false + + /** + * Wheter the [[DataStreamRel]] consumes retraction messages instead of forwarding them. + * The node might or might not produce new retraction messages. + */ + def consumesRetractions: Boolean = false + } http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala new file mode 100644 index 0000000..432158d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.RelNode + +import scala.collection.JavaConverters._ + +/** + * Collection of rules to annotate [[DataStreamRel]] nodes with retraction information. + * + * The rules have to be applied in the following order: + * - [[DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE]] + * - [[DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE]] + * - [[DataStreamRetractionRules.ACCMODE_INSTANCE]] + * + * The rules will assign a [[AccModeTrait]] to each [[DataStreamRel]] node of the plan. The + * trait defines the [[AccMode]] a node. + * - [[AccMode.Acc]] defines that the node produces only accumulate messages, i.e., all types of + * modifications (insert, update, delete) are encoded as accumulate messages. + * - [[AccMode.AccRetract]] defines that the node produces accumulate and retraction messages. + * Insert modifications are encoded as accumulate message, delete modifications as retraction + * message, and update modifications as a pair of accumulate and retraction messages. + * + */ +object DataStreamRetractionRules { + + /** + * Rule instance that assigns default retraction to [[DataStreamRel]] nodes. + */ + val DEFAULT_RETRACTION_INSTANCE = new AssignDefaultRetractionRule() + + /** + * Rule instance that checks if [[DataStreamRel]] nodes need to ship updates as retractions. + */ + val UPDATES_AS_RETRACTION_INSTANCE = new SetUpdatesAsRetractionRule() + + /** + * Rule instance that assigns the [[AccMode]] to [[DataStreamRel]] nodes. + */ + val ACCMODE_INSTANCE = new SetAccModeRule() + + /** + * Get all children RelNodes of a RelNode. + * + * @param parent The parent RelNode + * @return All child nodes + */ + def getChildRelNodes(parent: RelNode): Seq[RelNode] = { + parent.getInputs.asScala.map(_.asInstanceOf[HepRelVertex].getCurrentRel) + } + + /** + * Checks if a [[RelNode]] ships updates as retractions. + * + * @param node The node to check. + * @return True if the node ships updates as retractions, false otherwise. + */ + def sendsUpdatesAsRetraction(node: RelNode): Boolean = { + val retractionTrait = node.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE) + retractionTrait != null && retractionTrait.sendsUpdatesAsRetractions + } + + /** + * Rule that assigns the default retraction information to [[DataStreamRel]] nodes. + * The default is to not publish updates as retraction messages and [[AccMode.Acc]]. + */ + class AssignDefaultRetractionRule extends RelOptRule( + operand( + classOf[DataStreamRel], none()), + "AssignDefaultRetractionRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { + val rel = call.rel(0).asInstanceOf[DataStreamRel] + val traits = rel.getTraitSet + + val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) { + traits.plus(UpdateAsRetractionTrait.DEFAULT) + } else { + traits + } + val traitsWithAccMode = + if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) { + traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT) + } else { + traitsWithUpdateAsRetrac + } + + if (traits != traitsWithAccMode) { + call.transformTo(rel.copy(traitsWithAccMode, rel.getInputs)) + } + } + } + + /** + * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete + * changes as retraction messages. + */ + class SetUpdatesAsRetractionRule extends RelOptRule( + operand( + classOf[DataStreamRel], none()), + "SetUpdatesAsRetractionRule") { + + /** + * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction + * messages. + */ + def needsUpdatesAsRetraction(node: RelNode): Boolean = { + node match { + case _ if sendsUpdatesAsRetraction(node) => true + case dsr: DataStreamRel => dsr.needsUpdatesAsRetraction + } + } + + /** + * Annotates a [[RelNode]] to send out update and delete changes as retraction messages. + */ + def setUpdatesAsRetraction(relNode: RelNode): RelNode = { + val traitSet = relNode.getTraitSet + relNode.copy(traitSet.plus(new UpdateAsRetractionTrait(true)), relNode.getInputs) + } + + /** + * Annotates the children of a parent node with the information that they need to forward + * update and delete modifications as retraction messages. + * + * A child needs to produce retraction messages, if + * + * 1. its parent requires retraction messages by itself because it is a certain type + * of operator, such as a [[DataStreamGroupAggregate]] or [[DataStreamOverAggregate]], or + * 2. its parent requires retraction because its own parent requires retraction + * (transitive requirement). + * + */ + override def onMatch(call: RelOptRuleCall): Unit = { + val parent = call.rel(0).asInstanceOf[DataStreamRel] + + val children = getChildRelNodes(parent) + // check if children need to sent out retraction messages + val newChildren = for (c <- children) yield { + if (needsUpdatesAsRetraction(parent) && !sendsUpdatesAsRetraction(c)) { + setUpdatesAsRetraction(c) + } else { + c + } + } + + // update parent if a child was updated + if (children != newChildren) { + call.transformTo(parent.copy(parent.getTraitSet, newChildren.asJava)) + } + } + } + + /** + * Sets the [[AccMode]] of [[DataStreamRel]] nodes. + */ + class SetAccModeRule extends RelOptRule( + operand( + classOf[DataStreamRel], none()), + "SetAccModeRule") { + + /** + * Checks if a [[RelNode]] produces update and delete changes. + */ + def producesUpdates(relNode: RelNode): Boolean = { + relNode match { + case dsr: DataStreamRel => dsr.producesUpdates + } + } + + /** + * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. + */ + def isAccRetract(node: RelNode): Boolean = { + val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) + null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract + } + + /** + * Set [[AccMode.AccRetract]] to a [[RelNode]]. + */ + def setAccRetract(relNode: RelNode): RelNode = { + val traitSet = relNode.getTraitSet + relNode.copy(traitSet.plus(new AccModeTrait(AccMode.AccRetract)), relNode.getInputs) + } + + /** + * Checks if a [[RelNode]] consumes retraction messages instead of forwarding them. + * The node might or might not produce new retraction messages. + * This is checked by [[producesRetractions()]]. + */ + def consumesRetractions(relNode: RelNode): Boolean = { + relNode match { + case dsr: DataStreamRel => dsr.consumesRetractions + } + } + + /** + * Checks if a [[RelNode]] produces retraction messages. + */ + def producesRetractions(node: RelNode): Boolean = { + sendsUpdatesAsRetraction(node) && producesUpdates(node) + } + + /** + * Checks if a [[RelNode]] forwards retraction messages from its children. + */ + def forwardsRetractions(parent: RelNode, children: Seq[RelNode]): Boolean = { + children.exists(c => isAccRetract(c)) && !consumesRetractions(parent) + } + + /** + * Updates the [[AccMode]] of a [[RelNode]] and its children if necessary. + */ + override def onMatch(call: RelOptRuleCall): Unit = { + val parent = call.rel(0).asInstanceOf[DataStreamRel] + val children = getChildRelNodes(parent) + + // check if the AccMode of the parent needs to be updated + if (!isAccRetract(parent) && + (producesRetractions(parent) || forwardsRetractions(parent, children))) { + call.transformTo(setAccRetract(parent)) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala new file mode 100644 index 0000000..c43d951 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef} +import org.apache.calcite.rel.RelNode + +/** + * Definition of the [[UpdateAsRetractionTrait]]. + */ +class UpdateAsRetractionTraitDef extends RelTraitDef[UpdateAsRetractionTrait] { + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: UpdateAsRetractionTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: UpdateAsRetractionTrait, + toTrait: UpdateAsRetractionTrait): Boolean = true + + override def getTraitClass: Class[UpdateAsRetractionTrait] = classOf[UpdateAsRetractionTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def getDefault: UpdateAsRetractionTrait = UpdateAsRetractionTrait.DEFAULT +} + +object UpdateAsRetractionTraitDef { + val INSTANCE = new UpdateAsRetractionTraitDef +} + +/** + * Definition of the [[AccModeTrait]]. + */ +class AccModeTraitDef extends RelTraitDef[AccModeTrait] { + + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: AccModeTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: AccModeTrait, + toTrait: AccModeTrait): Boolean = true + + override def getTraitClass: Class[AccModeTrait] = classOf[AccModeTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def getDefault: AccModeTrait = AccModeTrait.DEFAULT +} + +object AccModeTraitDef { + val INSTANCE = new AccModeTraitDef +} http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala new file mode 100644 index 0000000..c3b43ba --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef} +import org.apache.flink.table.plan.nodes.datastream.AccMode.AccMode + +/** Tracks if a [[org.apache.calcite.rel.RelNode]] needs to send update and delete changes as + * retraction messages. + */ +class UpdateAsRetractionTrait extends RelTrait { + + /** + * Defines whether the [[org.apache.calcite.rel.RelNode]] needs to send update and delete + * changes as retraction messages. + */ + private var updateAsRetraction: Boolean = false + + def this(updateAsRetraction: Boolean) { + this() + this.updateAsRetraction = updateAsRetraction + } + + def sendsUpdatesAsRetractions: Boolean = updateAsRetraction + + override def register(planner: RelOptPlanner): Unit = { } + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = UpdateAsRetractionTraitDef.INSTANCE + + override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`) + + override def toString: String = updateAsRetraction.toString + +} + +object UpdateAsRetractionTrait { + val DEFAULT = new UpdateAsRetractionTrait(false) +} + +/** + * Tracks the AccMode of a [[org.apache.calcite.rel.RelNode]]. + */ +class AccModeTrait extends RelTrait { + + /** Defines the accumulating mode for a operator. */ + private var accMode = AccMode.Acc + + def this(accMode: AccMode) { + this() + this.accMode = accMode + } + + def getAccMode: AccMode = accMode + + override def register(planner: RelOptPlanner): Unit = { } + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = AccModeTraitDef.INSTANCE + + override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`) + + override def toString: String = accMode.toString +} + +object AccModeTrait { + val DEFAULT = new AccModeTrait(AccMode.Acc) +} + +/** + * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might + * produce. + * In [[AccMode.Acc]] the node only emit accumulate messages. + * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes, + * retraction messages for delete changes, and accumulate and retraction messages + * for update changes. + */ +object AccMode extends Enumeration { + type AccMode = Value + + val Acc = Value // Operator produces only accumulate (insert) messages + val AccRetract = Value // Operator produces accumulate (insert, update) and + // retraction (delete, update) messages +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 6b22b4a..a2d346c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules import org.apache.calcite.rel.rules._ import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule +import org.apache.flink.table.plan.nodes.datastream.DataStreamRetractionRules import org.apache.flink.table.plan.rules.dataSet._ import org.apache.flink.table.plan.rules.datastream._ @@ -195,6 +196,10 @@ object FlinkRuleSets { val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList( // rules + // retraction rules + DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE, + DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE, + DataStreamRetractionRules.ACCMODE_INSTANCE ) } http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala index d0de8fa..d21dd2a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala @@ -22,6 +22,7 @@ import org.apache.calcite.rel.rules._ import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable} import org.apache.calcite.tools.RuleSets import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder} +import org.apache.flink.table.plan.nodes.datastream.DataStreamRetractionRules import org.junit.Assert._ import org.junit.Test @@ -50,7 +51,7 @@ class CalciteConfigBuilderTest { val cc: CalciteConfig = new CalciteConfigBuilder() .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) .build() assertFalse(cc.replacesNormRuleSet) @@ -191,62 +192,62 @@ class CalciteConfigBuilderTest { def testReplaceDecorationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) .build() assertEquals(true, cc.replacesDecoRuleSet) assertTrue(cc.getDecoRuleSet.isDefined) val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) - assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) } @Test def testReplaceDecorationAddRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) - .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE)) + .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE)) .build() assertEquals(true, cc.replacesDecoRuleSet) assertTrue(cc.getDecoRuleSet.isDefined) val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet assertEquals(2, cSet.size) - assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) - assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE)) + assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) + assertTrue(cSet.contains(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE)) } @Test def testAddDecorationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) .build() assertEquals(false, cc.replacesDecoRuleSet) assertTrue(cc.getDecoRuleSet.isDefined) val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) - assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) } @Test def testAddAddDecorationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) - .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE, + DataStreamRetractionRules.ACCMODE_INSTANCE)) .build() assertEquals(false, cc.replacesDecoRuleSet) assertTrue(cc.getDecoRuleSet.isDefined) val cList = cc.getDecoRuleSet.get.iterator().asScala.toList assertEquals(3, cList.size) - assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE) - assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE) - assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE) + assertEquals(cList.head, DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE) + assertEquals(cList(1), DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE) + assertEquals(cList(2), DataStreamRetractionRules.ACCMODE_INSTANCE) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/5ba0f02a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala new file mode 100644 index 0000000..da3ee99 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules + +import org.apache.calcite.rel.RelNode +import org.apache.flink.table.api.Table +import org.apache.flink.table.plan.nodes.datastream._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ + + +class RetractionRulesTest extends TableTestBase { + + def streamTestForRetractionUtil(): StreamTableTestForRetractionUtil = { + new StreamTableTestForRetractionUtil() + } + + @Test + def testSelect(): Unit = { + val util = streamTestForRetractionUtil() + val table = util.addTable[(String, Int)]('word, 'number) + + val resultTable = table.select('word, 'number) + + val expected = s"DataStreamScan(false, Acc)" + + util.verifyTableTrait(resultTable, expected) + } + + // one level unbounded groupBy + @Test + def testGroupBy(): Unit = { + val util = streamTestForRetractionUtil() + val table = util.addTable[(String, Int)]('word, 'number) + val defaultStatus = "false, Acc" + + val resultTable = table + .groupBy('word) + .select('number.count) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + unaryNode( + "DataStreamCalc", + "DataStreamScan(true, Acc)", + "true, Acc" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifyTableTrait(resultTable, expected) + } + + + // two level unbounded groupBy + @Test + def testTwoGroupBy(): Unit = { + val util = streamTestForRetractionUtil() + val table = util.addTable[(String, Int)]('word, 'number) + val defaultStatus = "false, Acc" + + val resultTable = table + .groupBy('word) + .select('word, 'number.count as 'count) + .groupBy('count) + .select('count, 'count.count as 'frequency) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifyTableTrait(resultTable, expected) + } + + + // group window + @Test + def testGroupWindow(): Unit = { + val util = streamTestForRetractionUtil() + val table = util.addTable[(String, Int)]('word, 'number) + val defaultStatus = "false, Acc" + + val resultTable = table + .window(Tumble over 50.milli as 'w) + .groupBy('w, 'word) + .select('word, 'number.count as 'count) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + "DataStreamScan(true, Acc)", + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifyTableTrait(resultTable, expected) + } + + + + // group window after unbounded groupBy + @Test + def testGroupWindowAfterGroupBy(): Unit = { + val util = streamTestForRetractionUtil() + val table = util.addTable[(String, Int)]('word, 'number) + val defaultStatus = "false, Acc" + + val resultTable = table + .groupBy('word) + .select('word, 'number.count as 'count) + .window(Tumble over 50.milli as 'w) + .groupBy('w, 'count) + .select('count, 'count.count as 'frequency) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifyTableTrait(resultTable, expected) + } + + + // over window + @Test + def testOverWindow(): Unit = { + val util = streamTestForRetractionUtil() + util.addTable[(String, Int)]("T1", 'word, 'number) + val defaultStatus = "false, Acc" + + val sqlQuery = + "SELECT " + + "word, count(number) " + + "OVER (PARTITION BY word ORDER BY ProcTime() " + + "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM T1" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + "DataStreamScan(true, Acc)", + "true, Acc" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifySqlTrait(sqlQuery, expected) + } + + + // over window after unbounded groupBy + @Test + def testOverWindowAfterGroupBy(): Unit = { + val util = streamTestForRetractionUtil() + util.addTable[(String, Int)]("T1", 'word, 'number) + val defaultStatus = "false, Acc" + + val sqlQuery = + "SELECT " + + "_count, count(word) " + + "OVER (PARTITION BY _count ORDER BY ProcTime() " + + "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM " + + "(SELECT word, count(number) as _count FROM T1 GROUP BY word) " + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifySqlTrait(sqlQuery, expected) + } + + // test binaryNode + @Test + def testBinaryNode(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(String, Int)]('word, 'number) + val rTable = util.addTable[(String, Long)]('word_r, 'count_r) + val defaultStatus = "false, Acc" + + val resultTable = lTable + .groupBy('word) + .select('word, 'number.count as 'count) + .unionAll(rTable) + .groupBy('count) + .select('count, 'count.count as 'frequency) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + s"$defaultStatus" + ), + s"$defaultStatus" + ) + + util.verifyTableTrait(resultTable, expected) + } +} + + +class StreamTableTestForRetractionUtil extends StreamTableTestUtil { + + def verifySqlTrait(query: String, expected: String): Unit = { + verifyTableTrait(tEnv.sql(query), expected) + } + + def verifyTableTrait(resultTable: Table, expected: String): Unit = { + val relNode = resultTable.getRelNode + val optimized = tEnv.optimize(relNode) + val actual = TraitUtil.toString(optimized) + assertEquals( + expected.split("\n").map(_.trim).mkString("\n"), + actual.split("\n").map(_.trim).mkString("\n")) + } +} + + +object TraitUtil { + def toString(rel: RelNode): String = { + val className = rel.getClass.getSimpleName + var childString: String = "" + var i = 0 + while (i < rel.getInputs.size()) { + childString += TraitUtil.toString(rel.getInput(i)) + i += 1 + } + + val retractString = rel.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).toString + val accModetString = rel.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).toString + + s"""$className($retractString, $accModetString) + |$childString + |""".stripMargin.stripLineEnd + } +} +