http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala deleted file mode 100644 index 7133773..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.plan.logical._ -import org.apache.flink.api.table.plan.nodes.FlinkAggregate -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._ -import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ -import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _} -import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter} -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment} -import org.apache.flink.types.Row -import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} -import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} - -import scala.collection.JavaConverters._ - -class DataStreamAggregate( - window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputNode: RelNode, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - rowRelDataType: RelDataType, - inputType: RelDataType, - grouping: Array[Int]) - extends SingleRel(cluster, traitSet, inputNode) - with FlinkAggregate - with DataStreamRel { - - override def deriveRowType() = rowRelDataType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamAggregate( - window, - namedProperties, - cluster, - traitSet, - inputs.get(0), - namedAggregates, - getRowType, - inputType, - grouping) - } - - override def toString: String = { - s"Aggregate(${ - if (!grouping.isEmpty) { - s"groupBy: (${groupingToString(inputType, grouping)}), " - } else { - "" - } - }window: ($window), " + - s"select: (${ - aggregationToString( - inputType, - grouping, - getRowType, - namedAggregates, - namedProperties) - }))" - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw) - .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) - .item("window", window) - .item("select", aggregationToString( - inputType, - grouping, - getRowType, - namedAggregates, - namedProperties)) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - - val config = tableEnv.getConfig - val groupingKeys = grouping.indices.toArray - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan( - tableEnv, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - // get the output types - val fieldTypes: Array[TypeInformation[_]] = - getRowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .toArray - - val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) - - val aggString = aggregationToString( - inputType, - grouping, - getRowType, - namedAggregates, - namedProperties) - - val prepareOpName = s"prepare select: ($aggString)" - val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + - s"window: ($window), " + - s"select: ($aggString)" - val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" - - val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - - val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - - val result: DataStream[Any] = { - // check whether all aggregates support partial aggregate - if (AggregateUtil.doAllSupportPartialAggregation( - namedAggregates.map(_.getKey), - inputType, - grouping.length)) { - // do Incremental Aggregation - val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( - namedAggregates, - inputType, - getRowType, - grouping) - // grouped / keyed aggregation - if (groupingKeys.length > 0) { - val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - - windowedStream - .apply(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - // global / non-keyed aggregation - else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - - windowedStream - .apply(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - } - else { - // do non-Incremental Aggregation - // grouped / keyed aggregation - if (groupingKeys.length > 0) { - - val windowFunction = AggregateUtil.createWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - // global / non-keyed aggregation - else { - val windowFunction = AggregateUtil.createAllWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - } - } - // if the expected type is not a Row, inject a mapper to convert to the expected type - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(getConversionMapper( - config = config, - nullableInput = false, - inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType = expectedType.get, - conversionOperatorName = "DataStreamAggregateConversion", - fieldNames = getRowType.getFieldNames.asScala - )) - .name(mapName) - case _ => result - } - } -} -object DataStreamAggregate { - - - private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple]) - : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match { - - case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => - stream.window(TumblingProcessingTimeWindows.of(asTime(size))) - - case ProcessingTimeTumblingGroupWindow(_, size) => - stream.countWindow(asCount(size)) - - case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => - stream.window(TumblingEventTimeWindows.of(asTime(size))) - - case EventTimeTumblingGroupWindow(_, _, size) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " + - "currently not supported.") - - case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) => - stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide))) - - case ProcessingTimeSlidingGroupWindow(_, size, slide) => - stream.countWindow(asCount(size), asCount(slide)) - - case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => - stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide))) - - case EventTimeSlidingGroupWindow(_, _, size, slide) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " + - "currently not supported.") - - case ProcessingTimeSessionGroupWindow(_, gap: Expression) => - stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap))) - - case EventTimeSessionGroupWindow(_, _, gap) => - stream.window(EventTimeSessionWindows.withGap(asTime(gap))) - } - - private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row]) - : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match { - - case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => - stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size))) - - case ProcessingTimeTumblingGroupWindow(_, size) => - stream.countWindowAll(asCount(size)) - - case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => - stream.windowAll(TumblingEventTimeWindows.of(asTime(size))) - - case EventTimeTumblingGroupWindow(_, _, size) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " + - "currently not supported.") - - case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) => - stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide))) - - case ProcessingTimeSlidingGroupWindow(_, size, slide) => - stream.countWindowAll(asCount(size), asCount(slide)) - - case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => - stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide))) - - case EventTimeSlidingGroupWindow(_, _, size, slide) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " + - "currently not supported.") - - case ProcessingTimeSessionGroupWindow(_, gap) => - stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap))) - - case EventTimeSessionGroupWindow(_, _, gap) => - stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap))) - } - - def asTime(expr: Expression): Time = expr match { - case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value) - case _ => throw new IllegalArgumentException() - } - - def asCount(expr: Expression): Long = expr match { - case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value - case _ => throw new IllegalArgumentException() - } -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala deleted file mode 100644 index 5312a5f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.calcite.rex.RexProgram -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.plan.nodes.FlinkCalc -import org.apache.flink.api.table.typeutils.TypeConverter._ -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.streaming.api.datastream.DataStream - -/** - * Flink RelNode which matches along with FlatMapOperator. - * - */ -class DataStreamCalc( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowRelDataType: RelDataType, - calcProgram: RexProgram, - ruleDescription: String) - extends SingleRel(cluster, traitSet, input) - with FlinkCalc - with DataStreamRel { - - override def deriveRowType() = rowRelDataType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamCalc( - cluster, - traitSet, - inputs.get(0), - getRowType, - calcProgram, - ruleDescription - ) - } - - override def toString: String = calcToString(calcProgram, getExpressionString) - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw) - .item("select", selectionToString(calcProgram, getExpressionString)) - .itemIf("where", - conditionToString(calcProgram, getExpressionString), - calcProgram.getCondition != null) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - - val config = tableEnv.getConfig - - val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - val generator = new CodeGenerator(config, false, inputDataStream.getType) - - val body = functionBody( - generator, - inputDataStream.getType, - getRowType, - calcProgram, - config, - expectedType) - - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Any, Any]], - body, - returnType) - - val mapFunc = calcMapFunction(genFunction) - inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala deleted file mode 100644 index 3b6a653..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan._ - -class DataStreamConvention extends Convention { - - override def toString: String = getName - - override def useAbstractConvertersForConversion( - fromTraits: RelTraitSet, - toTraits: RelTraitSet): Boolean = false - - override def canConvertConvention(toConvention: Convention): Boolean = false - - def getInterface: Class[_] = classOf[DataStreamRel] - - def getName: String = "DATASTREAM" - - def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE - - def satisfies(`trait`: RelTrait): Boolean = this eq `trait` - - def register(planner: RelOptPlanner): Unit = { } -} - -object DataStreamConvention { - - val INSTANCE = new DataStreamConvention -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala deleted file mode 100644 index 3bfa6e2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.LogicalTableFunctionScan -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.calcite.sql.SemiJoinType -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.functions.utils.TableSqlFunction -import org.apache.flink.api.table.plan.nodes.FlinkCorrelate -import org.apache.flink.api.table.typeutils.TypeConverter._ -import org.apache.flink.streaming.api.datastream.DataStream - -/** - * Flink RelNode which matches along with join a user defined table function. - */ -class DataStreamCorrelate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputNode: RelNode, - scan: LogicalTableFunctionScan, - condition: Option[RexNode], - relRowType: RelDataType, - joinRowType: RelDataType, - joinType: SemiJoinType, - ruleDescription: String) - extends SingleRel(cluster, traitSet, inputNode) - with FlinkCorrelate - with DataStreamRel { - - override def deriveRowType() = relRowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamCorrelate( - cluster, - traitSet, - inputs.get(0), - scan, - condition, - relRowType, - joinRowType, - joinType, - ruleDescription) - } - - override def toString: String = { - val rexCall = scan.getCall.asInstanceOf[RexCall] - val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - correlateToString(rexCall, sqlFunction) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - val rexCall = scan.getCall.asInstanceOf[RexCall] - val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - super.explainTerms(pw) - .item("invocation", scan.getCall) - .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName) - .item("rowType", relRowType) - .item("joinType", joinType) - .itemIf("condition", condition.orNull, condition.isDefined) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]) - : DataStream[Any] = { - - val config = tableEnv.getConfig - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - // we do not need to specify input type - val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - - val funcRel = scan.asInstanceOf[LogicalTableFunctionScan] - val rexCall = funcRel.getCall.asInstanceOf[RexCall] - val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - val pojoFieldMapping = sqlFunction.getPojoFieldMapping - val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - - val generator = new CodeGenerator( - config, - false, - inputDS.getType, - Some(udtfTypeInfo), - None, - Some(pojoFieldMapping)) - - val body = functionBody( - generator, - udtfTypeInfo, - getRowType, - rexCall, - condition, - config, - joinType, - expectedType) - - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Any, Any]], - body, - returnType) - - val mapFunc = correlateMapFunction(genFunction) - - inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala deleted file mode 100644 index 6cf13a5..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig} -import org.apache.flink.api.table.plan.nodes.FlinkRel -import org.apache.flink.streaming.api.datastream.DataStream - -trait DataStreamRel extends RelNode with FlinkRel { - - /** - * Translates the FlinkRelNode into a Flink operator. - * - * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @param expectedType specifies the type the Flink operator should return. The type must - * have the same arity as the result. For instance, if the - * expected type is a RowTypeInfo this method will return a DataSet of - * type Row. If the expected type is Tuple2, the operator will return - * a Tuple2 if possible. Row otherwise. - * @return DataStream of type expectedType or RowTypeInfo - */ - def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any] - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala deleted file mode 100644 index da83b64..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.api.table.plan.schema.DataStreamTable -import org.apache.flink.streaming.api.datastream.DataStream - -/** - * Flink RelNode which matches along with DataStreamSource. - * It ensures that types without deterministic field order (e.g. POJOs) are not part of - * the plan translation. - */ -class DataStreamScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - rowRelDataType: RelDataType) - extends StreamScan(cluster, traitSet, table) { - - val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) - - override def deriveRowType() = rowRelDataType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamScan( - cluster, - traitSet, - getTable, - getRowType - ) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - - val config = tableEnv.getConfig - val inputDataStream: DataStream[Any] = dataStreamTable.dataStream - - convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala deleted file mode 100644 index f490d31..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, BiRel} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.streaming.api.datastream.DataStream - -import scala.collection.JavaConverters._ - -/** - * Flink RelNode which matches along with Union. - * - */ -class DataStreamUnion( - cluster: RelOptCluster, - traitSet: RelTraitSet, - leftNode: RelNode, - rightNode: RelNode, - rowRelDataType: RelDataType) - extends BiRel(cluster, traitSet, leftNode, rightNode) - with DataStreamRel { - - override def deriveRowType() = rowRelDataType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamUnion( - cluster, - traitSet, - inputs.get(0), - inputs.get(1), - getRowType - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("union", unionSelectionToString) - } - - override def toString = { - s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - - val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - leftDataSet.union(rightDataSet) - } - - private def unionSelectionToString: String = { - getRowType.getFieldNames.asScala.toList.mkString(", ") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala deleted file mode 100644 index 3b98653..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import com.google.common.collect.ImmutableList -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.Values -import org.apache.calcite.rex.RexLiteral -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.runtime.io.ValuesInputFormat -import org.apache.flink.api.table.typeutils.TypeConverter._ -import org.apache.flink.streaming.api.datastream.DataStream - -import scala.collection.JavaConverters._ - -/** - * DataStream RelNode for LogicalValues. - */ -class DataStreamValues( - cluster: RelOptCluster, - traitSet: RelTraitSet, - rowRelDataType: RelDataType, - tuples: ImmutableList[ImmutableList[RexLiteral]], - ruleDescription: String) - extends Values(cluster, rowRelDataType, tuples, traitSet) - with DataStreamRel { - - override def deriveRowType() = rowRelDataType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamValues( - cluster, - traitSet, - getRowType, - getTuples, - ruleDescription - ) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]) - : DataStream[Any] = { - - val config = tableEnv.getConfig - - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - val generator = new CodeGenerator(config) - - // generate code for every record - val generatedRecords = getTuples.asScala.map { r => - generator.generateResultExpression( - returnType, - getRowType.getFieldNames.asScala, - r.asScala) - } - - // generate input format - val generatedFunction = generator.generateValuesInputFormat( - ruleDescription, - generatedRecords.map(_.code), - returnType) - - val inputFormat = new ValuesInputFormat[Any]( - generatedFunction.name, - generatedFunction.code, - generatedFunction.returnType) - - tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]] - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala deleted file mode 100644 index b13770e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan._ -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.TableScan -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.PojoTypeInfo -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.plan.schema.FlinkTable -import org.apache.flink.api.table.runtime.MapRunner -import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType -import org.apache.flink.streaming.api.datastream.DataStream - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - -abstract class StreamScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable) - extends TableScan(cluster, traitSet, table) - with DataStreamRel { - - protected def convertToExpectedType( - input: DataStream[Any], - flinkTable: FlinkTable[_], - expectedType: Option[TypeInformation[Any]], - config: TableConfig): DataStream[Any] = { - - val inputType = input.getType - - expectedType match { - - // special case: - // if efficient type usage is enabled and no expected type is set - // we can simply forward the DataSet to the next operator. - // however, we cannot forward PojoTypes as their fields don't have an order - case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] => - input - - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - // conversion - if (determinedType != inputType) { - val generator = new CodeGenerator( - config, - nullableInput = false, - input.getType, - flinkTable.fieldIndexes) - - val conversion = generator.generateConverterResultExpression( - determinedType, - getRowType.getFieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - "DataSetSourceConversion", - classOf[MapFunction[Any, Any]], - body, - determinedType) - - val mapFunc = new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - input.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - input - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala deleted file mode 100644 index 8201070..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.datastream - -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.plan.schema.TableSourceTable -import org.apache.flink.api.table.sources.StreamTableSource -import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment} -import org.apache.flink.streaming.api.datastream.DataStream - -/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ -class StreamTableSourceScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - tableSource: StreamTableSource[_]) - extends StreamScan(cluster, traitSet, table) { - - override def deriveRowType() = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) - } - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new StreamTableSourceScan( - cluster, - traitSet, - getTable, - tableSource - ) - } - - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - - val config = tableEnv.getConfig - val inputDataStream: DataStream[Any] = tableSource - .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - - convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala deleted file mode 100644 index ee515c9..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules - -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.adapter.enumerable.EnumerableTableScan -import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand} -import org.apache.calcite.rel.logical.LogicalTableScan - -/** - * Rule that converts an EnumerableTableScan into a LogicalTableScan. - * We need this rule because Calcite creates an EnumerableTableScan - * when parsing a SQL query. We convert it into a LogicalTableScan - * so we can merge the optimization process with any plan that might be created - * by the Table API. - */ -class EnumerableToLogicalTableScan( - operand: RelOptRuleOperand, - description: String) extends RelOptRule(operand, description) { - - override def onMatch(call: RelOptRuleCall): Unit = { - val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan] - val table = oldRel.getTable - val newRel = LogicalTableScan.create(oldRel.getCluster, table) - call.transformTo(newRel) - } -} - -object EnumerableToLogicalTableScan { - val INSTANCE = new EnumerableToLogicalTableScan( - operand(classOf[EnumerableTableScan], any), - "EnumerableToLogicalTableScan") -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala deleted file mode 100644 index 183065c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules - -import org.apache.calcite.rel.rules._ -import org.apache.calcite.tools.{RuleSets, RuleSet} -import org.apache.flink.api.table.plan.rules.dataSet._ -import org.apache.flink.api.table.plan.rules.datastream._ -import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule} - -object FlinkRuleSets { - - /** - * RuleSet to optimize plans for batch / DataSet execution - */ - val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( - - // convert a logical table scan to a relational expression - TableScanRule.INSTANCE, - EnumerableToLogicalTableScan.INSTANCE, - - // push a filter into a join - FilterJoinRule.FILTER_ON_JOIN, - // push filter into the children of a join - FilterJoinRule.JOIN, - // push filter through an aggregation - FilterAggregateTransposeRule.INSTANCE, - - // aggregation and projection rules - AggregateProjectMergeRule.INSTANCE, - AggregateProjectPullUpConstantsRule.INSTANCE, - // push a projection past a filter or vice versa - ProjectFilterTransposeRule.INSTANCE, - FilterProjectTransposeRule.INSTANCE, - // push a projection to the children of a join - ProjectJoinTransposeRule.INSTANCE, - // remove identity project - ProjectRemoveRule.INSTANCE, - // reorder sort and projection - SortProjectTransposeRule.INSTANCE, - ProjectSortTransposeRule.INSTANCE, - - // join rules - JoinPushExpressionsRule.INSTANCE, - - // remove union with only a single child - UnionEliminatorRule.INSTANCE, - // convert non-all union into all-union + distinct - UnionToDistinctRule.INSTANCE, - - // remove aggregation if it does not aggregate and input is already distinct - AggregateRemoveRule.INSTANCE, - // push aggregate through join - AggregateJoinTransposeRule.EXTENDED, - // aggregate union rule - AggregateUnionAggregateRule.INSTANCE, - - // remove unnecessary sort rule - SortRemoveRule.INSTANCE, - - // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, - ReduceExpressionsRule.JOIN_INSTANCE, - - // prune empty results rules - PruneEmptyRules.AGGREGATE_INSTANCE, - PruneEmptyRules.FILTER_INSTANCE, - PruneEmptyRules.JOIN_LEFT_INSTANCE, - PruneEmptyRules.JOIN_RIGHT_INSTANCE, - PruneEmptyRules.PROJECT_INSTANCE, - PruneEmptyRules.SORT_INSTANCE, - PruneEmptyRules.UNION_INSTANCE, - - // calc rules - FilterCalcMergeRule.INSTANCE, - ProjectCalcMergeRule.INSTANCE, - FilterToCalcRule.INSTANCE, - ProjectToCalcRule.INSTANCE, - CalcMergeRule.INSTANCE, - - // translate to Flink DataSet nodes - DataSetAggregateRule.INSTANCE, - DataSetAggregateWithNullValuesRule.INSTANCE, - DataSetCalcRule.INSTANCE, - DataSetJoinRule.INSTANCE, - DataSetSingleRowJoinRule.INSTANCE, - DataSetScanRule.INSTANCE, - DataSetUnionRule.INSTANCE, - DataSetIntersectRule.INSTANCE, - DataSetMinusRule.INSTANCE, - DataSetSortRule.INSTANCE, - DataSetValuesRule.INSTANCE, - DataSetCorrelateRule.INSTANCE, - BatchTableSourceScanRule.INSTANCE, - // project pushdown optimization - PushProjectIntoBatchTableSourceScanRule.INSTANCE - ) - - /** - * RuleSet to optimize plans for stream / DataStream execution - */ - val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( - - // convert a logical table scan to a relational expression - TableScanRule.INSTANCE, - EnumerableToLogicalTableScan.INSTANCE, - - // calc rules - FilterToCalcRule.INSTANCE, - ProjectToCalcRule.INSTANCE, - FilterCalcMergeRule.INSTANCE, - ProjectCalcMergeRule.INSTANCE, - CalcMergeRule.INSTANCE, - - // prune empty results rules - PruneEmptyRules.FILTER_INSTANCE, - PruneEmptyRules.PROJECT_INSTANCE, - PruneEmptyRules.UNION_INSTANCE, - - // push and merge projection rules - ProjectFilterTransposeRule.INSTANCE, - FilterProjectTransposeRule.INSTANCE, - ProjectRemoveRule.INSTANCE, - - // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, - - // merge and push unions rules - UnionEliminatorRule.INSTANCE, - - // translate to DataStream nodes - DataStreamAggregateRule.INSTANCE, - DataStreamCalcRule.INSTANCE, - DataStreamScanRule.INSTANCE, - DataStreamUnionRule.INSTANCE, - DataStreamValuesRule.INSTANCE, - DataStreamCorrelateRule.INSTANCE, - StreamTableSourceScanRule.INSTANCE - ) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala deleted file mode 100644 index 8e3d8bb..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention} -import org.apache.flink.api.table.plan.schema.TableSourceTable -import org.apache.flink.api.table.sources.BatchTableSource - -/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */ -class BatchTableSourceScanRule - extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataSetConvention.INSTANCE, - "BatchTableSourceScanRule") - { - - /** Rule must only match if TableScan targets a [[BatchTableSource]] */ - override def matches(call: RelOptRuleCall): Boolean = { - val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) - dataSetTable match { - case tst: TableSourceTable => - tst.tableSource match { - case _: BatchTableSource[_] => - true - case _ => - false - } - case _ => - false - } - } - - def convert(rel: RelNode): RelNode = { - val scan: TableScan = rel.asInstanceOf[TableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - - val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource - .asInstanceOf[BatchTableSource[_]] - new BatchTableSourceScan( - rel.getCluster, - traitSet, - scan.getTable, - tableSource - ) - } -} - -object BatchTableSourceScanRule { - val INSTANCE: RelOptRule = new BatchTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala deleted file mode 100644 index 0311c48..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.flink.api.table.TableException -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} -import scala.collection.JavaConversions._ - -class DataSetAggregateRule - extends ConverterRule( - classOf[LogicalAggregate], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetAggregateRule") - { - - override def matches(call: RelOptRuleCall): Boolean = { - val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] - - //for non grouped agg sets should attach null row to source data - //need apply DataSetAggregateWithNullValuesRule - if (agg.getGroupSet.isEmpty) { - return false - } - - // check if we have distinct aggregates - val distinctAggs = agg.getAggCallList.exists(_.isDistinct) - if (distinctAggs) { - throw TableException("DISTINCT aggregates are currently not supported.") - } - - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } - - !distinctAggs && !groupSets && !agg.indicator - } - - override def convert(rel: RelNode): RelNode = { - val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - - new DataSetAggregate( - rel.getCluster, - traitSet, - convInput, - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray) - } - } - -object DataSetAggregateRule { - val INSTANCE: RelOptRule = new DataSetAggregateRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala deleted file mode 100644 index 3bf3e0c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan._ -import scala.collection.JavaConversions._ -import com.google.common.collect.ImmutableList -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate} -import org.apache.calcite.rex.RexLiteral -import org.apache.flink.api.table._ -import org.apache.flink.types.Row -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} - -/** - * Rule for insert [[Row]] with null records into a [[DataSetAggregate]] - * Rule apply for non grouped aggregate query - */ -class DataSetAggregateWithNullValuesRule - extends ConverterRule( - classOf[LogicalAggregate], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetAggregateWithNullValuesRule") -{ - - override def matches(call: RelOptRuleCall): Boolean = { - val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] - - //for grouped agg sets shouldn't attach of null row - //need apply other rules. e.g. [[DataSetAggregateRule]] - if (!agg.getGroupSet.isEmpty) { - return false - } - - // check if we have distinct aggregates - val distinctAggs = agg.getAggCallList.exists(_.isDistinct) - if (distinctAggs) { - throw TableException("DISTINCT aggregates are currently not supported.") - } - - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } - !distinctAggs && !groupSets && !agg.indicator - } - - override def convert(rel: RelNode): RelNode = { - val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val cluster: RelOptCluster = rel.getCluster - - val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType) - val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] = - ImmutableList.of(ImmutableList.copyOf[RexLiteral]( - for (fieldType <- fieldTypes) - yield { - cluster.getRexBuilder. - makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral] - })) - - val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals) - val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true) - - new DataSetAggregate( - cluster, - traitSet, - RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE), - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray - ) - } -} - -object DataSetAggregateWithNullValuesRule { - val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala deleted file mode 100644 index 88e74a9..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalCalc -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} - -class DataSetCalcRule - extends ConverterRule( - classOf[LogicalCalc], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetCalcRule") - { - - def convert(rel: RelNode): RelNode = { - val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) - - new DataSetCalc( - rel.getCluster, - traitSet, - convInput, - rel.getRowType, - calc.getProgram, - description) - } - } - -object DataSetCalcRule { - val INSTANCE: RelOptRule = new DataSetCalcRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala deleted file mode 100644 index 39756be..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} -import org.apache.calcite.rex.RexNode -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} - -/** - * Rule to convert a LogicalCorrelate into a DataSetCorrelate. - */ -class DataSetCorrelateRule - extends ConverterRule( - classOf[LogicalCorrelate], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetCorrelateRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] - val right = join.getRight.asInstanceOf[RelSubset].getOriginal - - - right match { - // right node is a table function - case scan: LogicalTableFunctionScan => true - // a filter is pushed above the table function - case filter: LogicalFilter => - filter - .getInput.asInstanceOf[RelSubset] - .getOriginal - .isInstanceOf[LogicalTableFunctionScan] - case _ => false - } - } - - override def convert(rel: RelNode): RelNode = { - val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) - val right: RelNode = join.getInput(1) - - def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = { - relNode match { - case rel: RelSubset => - convertToCorrelate(rel.getRelList.get(0), condition) - - case filter: LogicalFilter => - convertToCorrelate( - filter.getInput.asInstanceOf[RelSubset].getOriginal, - Some(filter.getCondition)) - - case scan: LogicalTableFunctionScan => - new DataSetCorrelate( - rel.getCluster, - traitSet, - convInput, - scan, - condition, - rel.getRowType, - join.getRowType, - join.getJoinType, - description) - } - } - convertToCorrelate(right, None) - } - } - -object DataSetCorrelateRule { - val INSTANCE: RelOptRule = new DataSetCorrelateRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala deleted file mode 100644 index c0e3269..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalIntersect -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention} - -class DataSetIntersectRule - extends ConverterRule( - classOf[LogicalIntersect], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetIntersectRule") -{ - - def convert(rel: RelNode): RelNode = { - - val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE) - - new DataSetIntersect( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - intersect.all) - } -} - -object DataSetIntersectRule { - val INSTANCE: RelOptRule = new DataSetIntersectRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala deleted file mode 100644 index 3fab8bf..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalJoin - -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention} - -import scala.collection.JavaConversions._ - -class DataSetJoinRule - extends ConverterRule( - classOf[LogicalJoin], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin] - - val joinInfo = join.analyzeCondition - - // joins require an equi-condition or a conjunctive predicate with at least one equi-condition - !joinInfo.pairs().isEmpty - } - - override def convert(rel: RelNode): RelNode = { - - val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) - val joinInfo = join.analyzeCondition - - new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.getCondition, - join.getRowType, - joinInfo, - joinInfo.pairs.toList, - join.getJoinType, - null, - description) - } - -} - -object DataSetJoinRule { - val INSTANCE: RelOptRule = new DataSetJoinRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala deleted file mode 100644 index 44bead0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalMinus -import org.apache.calcite.rel.rules.UnionToDistinctRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus} - -class DataSetMinusRule - extends ConverterRule( - classOf[LogicalMinus], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetMinusRule") -{ - - def convert(rel: RelNode): RelNode = { - - val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE) - - new DataSetMinus( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - minus.all) - } -} - -object DataSetMinusRule { - val INSTANCE: RelOptRule = new DataSetMinusRule -} - http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala deleted file mode 100644 index 7477690..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetScan} -import org.apache.flink.api.table.plan.schema.DataSetTable - -class DataSetScanRule - extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetScanRule") - { - - /** - * If the input is not a DataSetTable, we want the TableScanRule to match instead - */ - override def matches(call: RelOptRuleCall): Boolean = { - val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]]) - dataSetTable match { - case _: DataSetTable[Any] => - true - case _ => - false - } - } - - def convert(rel: RelNode): RelNode = { - val scan: TableScan = rel.asInstanceOf[TableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - - new DataSetScan( - rel.getCluster, - traitSet, - scan.getTable, - rel.getRowType - ) - } -} - -object DataSetScanRule { - val INSTANCE: RelOptRule = new DataSetScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala deleted file mode 100644 index 8109fcf..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin} - -class DataSetSingleRowJoinRule - extends ConverterRule( - classOf[LogicalJoin], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetSingleRowCrossRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join = call.rel(0).asInstanceOf[LogicalJoin] - - if (isInnerJoin(join)) { - isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || - isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) - } else { - false - } - } - - private def isInnerJoin(join: LogicalJoin) = { - join.getJoinType == JoinRelType.INNER - } - - private def isGlobalAggregation(node: RelNode) = { - node.isInstanceOf[LogicalAggregate] && - isSingleRow(node.asInstanceOf[LogicalAggregate]) - } - - private def isSingleRow(agg: LogicalAggregate) = { - agg.getGroupSet.isEmpty - } - - override def convert(rel: RelNode): RelNode = { - val join = rel.asInstanceOf[LogicalJoin] - val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE) - val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE) - val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) - - new DataSetSingleRowJoin( - rel.getCluster, - traitSet, - dataSetLeftNode, - dataSetRightNode, - leftIsSingle, - rel.getRowType, - join.getCondition, - join.getRowType, - description) - } -} - -object DataSetSingleRowJoinRule { - val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule -}