http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala deleted file mode 100644 index 5c1fb53..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} - -class DataSetSortRule - extends ConverterRule( - classOf[LogicalSort], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetSortRule") { - - override def convert(rel: RelNode): RelNode = { - - val sort: LogicalSort = rel.asInstanceOf[LogicalSort] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE) - - new DataSetSort( - rel.getCluster, - traitSet, - convInput, - sort.getCollation, - rel.getRowType, - sort.offset, - sort.fetch - ) - } -} - -object DataSetSortRule { - val INSTANCE: RelOptRule = new DataSetSortRule -}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala deleted file mode 100644 index ea35637..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.calcite.rel.rules.UnionToDistinctRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} - -class DataSetUnionRule - extends ConverterRule( - classOf[LogicalUnion], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetUnionRule") - { - - /** - * Only translate UNION ALL. - * Note: A distinct Union are translated into - * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]] - */ - override def matches(call: RelOptRuleCall): Boolean = { - val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion] - union.all - } - - def convert(rel: RelNode): RelNode = { - - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - - new DataSetUnion( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType) - } -} - -object DataSetUnionRule { - val INSTANCE: RelOptRule = new DataSetUnionRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala deleted file mode 100644 index 3d6c0de..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalValues -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention} - -class DataSetValuesRule - extends ConverterRule( - classOf[LogicalValues], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetValuesRule") -{ - - def convert(rel: RelNode): RelNode = { - - val values: LogicalValues = rel.asInstanceOf[LogicalValues] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - - new DataSetValues( - rel.getCluster, - traitSet, - rel.getRowType, - values.getTuples, - description) - } -} - -object DataSetValuesRule { - val INSTANCE: RelOptRule = new DataSetValuesRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala deleted file mode 100644 index 301a45b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{none, operand} -import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} -import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._ -import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} - -/** - * This rule tries to push projections into a BatchTableSourceScan. - */ -class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( - operand(classOf[DataSetCalc], - operand(classOf[BatchTableSourceScan], none)), - "PushProjectIntoBatchTableSourceScanRule") { - - override def matches(call: RelOptRuleCall) = { - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - scan.tableSource match { - case _: ProjectableTableSource[_] => true - case _ => false - } - } - - override def onMatch(call: RelOptRuleCall) { - val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - - val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) - - // if no fields can be projected, there is no need to transform subtree - if (scan.tableSource.getNumberOfFields != usedFields.length) { - val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] - val newTableSource = originTableSource.projectFields(usedFields) - val newScan = new BatchTableSourceScan( - scan.getCluster, - scan.getTraitSet, - scan.getTable, - newTableSource.asInstanceOf[BatchTableSource[_]]) - - val newCalcProgram = rewriteRexProgram( - calc.calcProgram, - newScan.getRowType, - usedFields, - calc.getCluster.getRexBuilder) - - // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes - if (newCalcProgram.isTrivial) { - call.transformTo(newScan) - } else { - val newCalc = new DataSetCalc( - calc.getCluster, - calc.getTraitSet, - newScan, - calc.getRowType, - newCalcProgram, - description) - call.transformTo(newCalc) - } - } - } -} - -object PushProjectIntoBatchTableSourceScanRule { - val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala deleted file mode 100644 index dff2adc..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.TableException -import org.apache.flink.api.table.expressions.Alias -import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate -import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention} - -import scala.collection.JavaConversions._ - -class DataStreamAggregateRule - extends ConverterRule( - classOf[LogicalWindowAggregate], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamAggregateRule") - { - - override def matches(call: RelOptRuleCall): Boolean = { - val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate] - - // check if we have distinct aggregates - val distinctAggs = agg.getAggCallList.exists(_.isDistinct) - if (distinctAggs) { - throw TableException("DISTINCT aggregates are currently not supported.") - } - - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } - - !distinctAggs && !groupSets && !agg.indicator - } - - override def convert(rel: RelNode): RelNode = { - val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE) - - new DataStreamAggregate( - agg.getWindow, - agg.getNamedProperties, - rel.getCluster, - traitSet, - convInput, - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray) - } - } - -object DataStreamAggregateRule { - val INSTANCE: RelOptRule = new DataStreamAggregateRule -} - http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala deleted file mode 100644 index b62967a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalCalc -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamCalc -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention - -class DataStreamCalcRule - extends ConverterRule( - classOf[LogicalCalc], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamCalcRule") -{ - - def convert(rel: RelNode): RelNode = { - val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE) - - new DataStreamCalc( - rel.getCluster, - traitSet, - convInput, - rel.getRowType, - calc.getProgram, - description) - } -} - -object DataStreamCalcRule { - val INSTANCE: RelOptRule = new DataStreamCalcRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala deleted file mode 100644 index 554c6c1..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} -import org.apache.calcite.rex.RexNode -import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention} - -/** - * Rule to convert a LogicalCorrelate into a DataStreamCorrelate. - */ -class DataStreamCorrelateRule - extends ConverterRule( - classOf[LogicalCorrelate], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamCorrelateRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] - val right = join.getRight.asInstanceOf[RelSubset].getOriginal - - right match { - // right node is a table function - case scan: LogicalTableFunctionScan => true - // a filter is pushed above the table function - case filter: LogicalFilter => - filter - .getInput.asInstanceOf[RelSubset] - .getOriginal - .isInstanceOf[LogicalTableFunctionScan] - case _ => false - } - } - - override def convert(rel: RelNode): RelNode = { - val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE) - val right: RelNode = join.getInput(1) - - def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = { - relNode match { - case rel: RelSubset => - convertToCorrelate(rel.getRelList.get(0), condition) - - case filter: LogicalFilter => - convertToCorrelate( - filter.getInput.asInstanceOf[RelSubset].getOriginal, - Some(filter.getCondition)) - - case scan: LogicalTableFunctionScan => - new DataStreamCorrelate( - rel.getCluster, - traitSet, - convInput, - scan, - condition, - rel.getRowType, - join.getRowType, - join.getJoinType, - description) - } - } - convertToCorrelate(right, None) - } - -} - -object DataStreamCorrelateRule { - val INSTANCE: RelOptRule = new DataStreamCorrelateRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala deleted file mode 100644 index 62638bc..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamScan -import org.apache.flink.api.table.plan.schema.DataStreamTable - -class DataStreamScanRule - extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamScanRule") -{ - - override def matches(call: RelOptRuleCall): Boolean = { - val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]]) - dataSetTable match { - case _: DataStreamTable[Any] => - true - case _ => - false - } - } - - def convert(rel: RelNode): RelNode = { - val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - - new DataStreamScan( - rel.getCluster, - traitSet, - scan.getTable, - rel.getRowType - ) - } -} - -object DataStreamScanRule { - val INSTANCE: RelOptRule = new DataStreamScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala deleted file mode 100644 index 78a5486..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamUnion - -class DataStreamUnionRule - extends ConverterRule( - classOf[LogicalUnion], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamUnionRule") -{ - - def convert(rel: RelNode): RelNode = { - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE) - - new DataStreamUnion( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType) - } -} - -object DataStreamUnionRule { - val INSTANCE: RelOptRule = new DataStreamUnionRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala deleted file mode 100644 index 738642d..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalValues -import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention} - -class DataStreamValuesRule - extends ConverterRule( - classOf[LogicalValues], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamValuesRule") -{ - - def convert(rel: RelNode): RelNode = { - - val values: LogicalValues = rel.asInstanceOf[LogicalValues] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - - new DataStreamValues( - rel.getCluster, - traitSet, - rel.getRowType, - values.getTuples, - description) - } -} - -object DataStreamValuesRule { - val INSTANCE: RelOptRule = new DataStreamValuesRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala deleted file mode 100644 index 91dd255..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.api.table.plan.nodes.datastream. - {StreamTableSourceScan, DataStreamConvention} -import org.apache.flink.api.table.plan.schema.TableSourceTable -import org.apache.flink.api.table.sources.StreamTableSource - -/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */ -class StreamTableSourceScanRule - extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataStreamConvention.INSTANCE, - "StreamTableSourceScanRule") -{ - - /** Rule must only match if TableScan targets a [[StreamTableSource]] */ - override def matches(call: RelOptRuleCall): Boolean = { - val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) - dataSetTable match { - case tst: TableSourceTable => - tst.tableSource match { - case _: StreamTableSource[_] => - true - case _ => - false - } - case _ => - false - } - } - - def convert(rel: RelNode): RelNode = { - val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - - // The original registered table source - val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable]) - val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]] - - new StreamTableSourceScan( - rel.getCluster, - traitSet, - scan.getTable, - tableSource - ) - } -} - -object StreamTableSourceScanRule { - val INSTANCE: RelOptRule = new StreamTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala deleted file mode 100644 index d78e07f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.util - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex._ - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.collection.JavaConverters._ - -object RexProgramProjectExtractor { - - /** - * Extracts the indexes of input fields accessed by the RexProgram. - * - * @param rexProgram The RexProgram to analyze - * @return The indexes of accessed input fields - */ - def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { - val visitor = new RefFieldsVisitor - // extract input fields from project expressions - rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor)) - val condition = rexProgram.getCondition - // extract input fields from condition expression - if (condition != null) { - rexProgram.expandLocalRef(condition).accept(visitor) - } - visitor.getFields - } - - /** - * Generates a new RexProgram based on mapped input fields. - * - * @param rexProgram original RexProgram - * @param inputRowType input row type - * @param usedInputFields indexes of used input fields - * @param rexBuilder builder for Rex expressions - * - * @return A RexProgram with mapped input field expressions. - */ - def rewriteRexProgram( - rexProgram: RexProgram, - inputRowType: RelDataType, - usedInputFields: Array[Int], - rexBuilder: RexBuilder): RexProgram = { - - val inputRewriter = new InputRewriter(usedInputFields) - val newProjectExpressions = rexProgram.getProjectList.map( - exp => rexProgram.expandLocalRef(exp).accept(inputRewriter) - ).toList.asJava - - val oldCondition = rexProgram.getCondition - val newConditionExpression = { - oldCondition match { - case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter) - case _ => null // null does not match any type - } - } - RexProgram.create( - inputRowType, - newProjectExpressions, - newConditionExpression, - rexProgram.getOutputRowType, - rexBuilder - ) - } -} - -/** - * A RexVisitor to extract used input fields - */ -class RefFieldsVisitor extends RexVisitorImpl[Unit](true) { - private var fields = mutable.LinkedHashSet[Int]() - - def getFields: Array[Int] = fields.toArray - - override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex - - override def visitCall(call: RexCall): Unit = - call.operands.foreach(operand => operand.accept(this)) -} - -/** - * A RexShuttle to rewrite field accesses of a RexProgram. - * - * @param fields fields mapping - */ -class InputRewriter(fields: Array[Int]) extends RexShuttle { - - /** old input fields ref index -> new input fields ref index mappings */ - private val fieldMap: Map[Int, Int] = - fields.zipWithIndex.toMap - - override def visitInputRef(inputRef: RexInputRef): RexNode = - new RexInputRef(relNodeIndex(inputRef), inputRef.getType) - - override def visitLocalRef(localRef: RexLocalRef): RexNode = - new RexInputRef(relNodeIndex(localRef), localRef.getType) - - private def relNodeIndex(ref: RexSlot): Int = - fieldMap.getOrElse(ref.getIndex, - throw new IllegalArgumentException("input field contains invalid index")) -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala deleted file mode 100644 index 92fcb83..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.`type`.ArraySqlType -import org.apache.flink.api.common.typeinfo.TypeInformation - -/** - * Flink distinguishes between primitive arrays (int[], double[], ...) and - * object arrays (Integer[], MyPojo[], ...). This custom type supports both cases. - */ -class ArrayRelDataType( - val typeInfo: TypeInformation[_], - elementType: RelDataType, - isNullable: Boolean) - extends ArraySqlType( - elementType, - isNullable) { - - override def toString = s"ARRAY($typeInfo)" - - def canEqual(other: Any): Boolean = other.isInstanceOf[ArrayRelDataType] - - override def equals(other: Any): Boolean = other match { - case that: ArrayRelDataType => - super.equals(that) && - (that canEqual this) && - typeInfo == that.typeInfo - case _ => false - } - - override def hashCode(): Int = { - typeInfo.hashCode() - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala deleted file mode 100644 index b9ceff0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import java.util - -import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.FlinkTypeFactory -import org.apache.flink.api.table.plan.schema.CompositeRelDataType.createFieldList - -import scala.collection.JavaConverters._ - -/** - * Composite type for encapsulating Flink's [[CompositeType]]. - * - * @param compositeType CompositeType to encapsulate - * @param typeFactory Flink's type factory - */ -class CompositeRelDataType( - val compositeType: CompositeType[_], - typeFactory: FlinkTypeFactory) - extends RelRecordType(createFieldList(compositeType, typeFactory)) { - - override def toString = s"COMPOSITE($compositeType)" - - def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType] - - override def equals(other: Any): Boolean = other match { - case that: CompositeRelDataType => - super.equals(that) && - (that canEqual this) && - compositeType == that.compositeType - case _ => false - } - - override def hashCode(): Int = { - compositeType.hashCode() - } - -} - -object CompositeRelDataType { - - /** - * Converts the fields of a composite type to list of [[RelDataTypeField]]. - */ - private def createFieldList( - compositeType: CompositeType[_], - typeFactory: FlinkTypeFactory) - : util.List[RelDataTypeField] = { - - compositeType - .getFieldNames - .zipWithIndex - .map { case (name, index) => - new RelDataTypeFieldImpl( - name, - index, - typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index))) - .asInstanceOf[RelDataTypeField] - } - .toList - .asJava - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala deleted file mode 100644 index bbcba13..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import java.lang.Double -import java.util -import java.util.Collections - -import org.apache.calcite.rel.{RelCollation, RelDistribution} -import org.apache.calcite.schema.Statistic -import org.apache.calcite.util.ImmutableBitSet -import org.apache.flink.api.java.DataSet - -class DataSetTable[T]( - val dataSet: DataSet[T], - override val fieldIndexes: Array[Int], - override val fieldNames: Array[String]) - extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) { - - override def getStatistic: Statistic = { - new DefaultDataSetStatistic - } - -} - -class DefaultDataSetStatistic extends Statistic { - - override def getRowCount: Double = 1000d - - override def getCollations: util.List[RelCollation] = Collections.emptyList() - - override def isKey(columns: ImmutableBitSet): Boolean = false - - override def getDistribution: RelDistribution = null -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala deleted file mode 100644 index 570d723..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.flink.streaming.api.datastream.DataStream - -class DataStreamTable[T]( - val dataStream: DataStream[T], - override val fieldIndexes: Array[Int], - override val fieldNames: Array[String]) - extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala deleted file mode 100644 index 84d6d7e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.{FlinkTypeFactory, TableException} - -abstract class FlinkTable[T]( - val typeInfo: TypeInformation[T], - val fieldIndexes: Array[Int], - val fieldNames: Array[String]) - extends AbstractTable { - - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - "Number of field indexes and field names must be equal.") - } - - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException( - "Table field names must be unique.") - } - - val fieldTypes: Array[TypeInformation[_]] = - typeInfo match { - case cType: CompositeType[T] => - if (fieldNames.length != cType.getArity) { - throw new TableException( - s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") - } - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") - } - Array(aType) - } - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala deleted file mode 100644 index 540a5c8..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.schema - -import java.lang.reflect.{Method, Type} -import java.util - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.TableFunction -import org.apache.calcite.schema.impl.ReflectiveFunctionBase -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.{FlinkTypeFactory, TableException} - -/** - * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. - * We need it in order to create a [[org.apache.flink.api.table.functions.utils.TableSqlFunction]]. - * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. - */ -class FlinkTableFunctionImpl[T]( - val typeInfo: TypeInformation[T], - val fieldIndexes: Array[Int], - val fieldNames: Array[String], - val evalMethod: Method) - extends ReflectiveFunctionBase(evalMethod) - with TableFunction { - - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - "Number of field indexes and field names must be equal.") - } - - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException( - "Table field names must be unique.") - } - - val fieldTypes: Array[TypeInformation[_]] = - typeInfo match { - case cType: CompositeType[T] => - if (fieldNames.length != cType.getArity) { - throw new TableException( - s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") - } - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") - } - Array(aType) - } - - override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] - - override def getRowType(typeFactory: RelDataTypeFactory, - arguments: util.List[AnyRef]): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - val builder = flinkTypeFactory.builder - fieldNames - .zip(fieldTypes) - .foreach { f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) - } - builder.build - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala deleted file mode 100644 index a3012d1..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.FlinkTypeSystem - -/** - * Generic type for encapsulating Flink's [[TypeInformation]]. - * - * @param typeInfo TypeInformation to encapsulate - * @param typeSystem Flink's type system - */ -class GenericRelDataType( - val typeInfo: TypeInformation[_], - typeSystem: FlinkTypeSystem) - extends BasicSqlType( - typeSystem, - SqlTypeName.ANY) { - - override def toString = s"ANY($typeInfo)" - - def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType] - - override def equals(other: Any): Boolean = other match { - case that: GenericRelDataType => - super.equals(that) && - (that canEqual this) && - typeInfo == that.typeInfo - case _ => false - } - - override def hashCode(): Int = { - typeInfo.hashCode() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala deleted file mode 100644 index f952d83..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.plan.RelOptTable.ToRelContext -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.Schema.TableType -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.schema.TranslatableTable - -/** - * A [[org.apache.calcite.schema.Table]] implementation for registering - * Table API Tables in the Calcite schema to be used by Flink SQL. - * It implements [[TranslatableTable]] so that its logical scan - * can be converted to a relational expression. - * - * @see [[DataSetTable]] - */ -class RelTable(relNode: RelNode) extends AbstractTable with TranslatableTable { - - override def getJdbcTableType: TableType = ??? - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType - - override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { - relNode - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala deleted file mode 100644 index 72be00c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.flink.types.Row -import org.apache.flink.api.table.sources.TableSource -import org.apache.flink.api.java.typeutils.RowTypeInfo - -/** Table which defines an external table via a [[TableSource]] */ -class TableSourceTable(val tableSource: TableSource[_]) - extends FlinkTable[Row]( - typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*), - fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, - fieldNames = tableSource.getFieldsNames) http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala deleted file mode 100644 index 5896f4c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import java.lang.Iterable - -import org.apache.flink.api.common.functions.RichMapPartitionFunction -import org.apache.flink.util.Collector - -class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] { - - override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = { - val partitionIndex = getRuntimeContext.getIndexOfThisSubtask - var elementCount = 0L - val iterator = value.iterator() - while (iterator.hasNext) { - if (elementCount != Long.MaxValue) { // prevent overflow - elementCount += 1L - } - iterator.next() - } - out.collect(partitionIndex, elementCount) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala deleted file mode 100644 index 2e57a0f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.codegen.Compiler -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector -import org.slf4j.LoggerFactory - -class FlatJoinRunner[IN1, IN2, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT]) - extends RichFlatJoinFunction[IN1, IN2, OUT] - with ResultTypeQueryable[OUT] - with Compiler[FlatJoinFunction[IN1, IN2, OUT]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: FlatJoinFunction[IN1, IN2, OUT] = null - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating FlatJoinFunction.") - function = clazz.newInstance() - } - - override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit = - function.join(first, second, out) - - override def getProducedType: TypeInformation[OUT] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala deleted file mode 100644 index e228e2b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.codegen.Compiler -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector -import org.slf4j.LoggerFactory - -class FlatMapRunner[IN, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT]) - extends RichFlatMapFunction[IN, OUT] - with ResultTypeQueryable[OUT] - with Compiler[FlatMapFunction[IN, OUT]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: FlatMapFunction[IN, OUT] = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating FlatMapFunction.") - function = clazz.newInstance() - } - - override def flatMap(in: IN, out: Collector[OUT]): Unit = - function.flatMap(in, out) - - override def getProducedType: TypeInformation[OUT] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala deleted file mode 100644 index 9930811..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import java.lang.{Iterable => JIterable} - -import org.apache.flink.api.common.functions.CoGroupFunction -import org.apache.flink.util.Collector - -class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{ - override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = { - if (first == null || second == null) return - val leftIter = first.iterator() - val rightIter = second.iterator() - if (all) { - while (leftIter.hasNext && rightIter.hasNext) { - out.collect(leftIter.next) - rightIter.next - } - } else { - if (leftIter.hasNext && rightIter.hasNext) { - out.collect(leftIter.next) - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala deleted file mode 100644 index 5ec9035..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.RichFilterFunction -import org.apache.flink.configuration.Configuration - -import scala.collection.JavaConverters._ - - -class LimitFilterFunction[T]( - limitStart: Long, - limitEnd: Long, - broadcastName: String) - extends RichFilterFunction[T] { - - var partitionIndex: Int = _ - var elementCount: Long = _ - var countList: Array[Long] = _ - - override def open(config: Configuration) { - partitionIndex = getRuntimeContext.getIndexOfThisSubtask - - val countPartitionResult = getRuntimeContext - .getBroadcastVariable[(Int, Long)](broadcastName) - .asScala - - // sort by partition index, extract number per partition, sum with intermediate results - countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) => - val sum = a + b - if (sum < 0L) { // prevent overflow - Long.MaxValue - } - sum - }.toArray - - elementCount = 0 - } - - override def filter(value: T): Boolean = { - if (elementCount != Long.MaxValue) { // prevent overflow - elementCount += 1L - } - // we filter out records that are not within the limit (Long.MaxValue is unlimited) - limitStart - countList(partitionIndex) < elementCount && - (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala deleted file mode 100644 index 76650c2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.util.Collector - -class MapJoinLeftRunner[IN1, IN2, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT], - broadcastSetName: String) - extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) { - - override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit = - function.join(multiInput, singleInput, out) -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala deleted file mode 100644 index 52b01cf..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.util.Collector - -class MapJoinRightRunner[IN1, IN2, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT], - broadcastSetName: String) - extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) { - - override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit = - function.join(singleInput, multiInput, out) -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala deleted file mode 100644 index 9fd1876..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.codegen.Compiler -import org.apache.flink.configuration.Configuration -import org.slf4j.LoggerFactory - -class MapRunner[IN, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT]) - extends RichMapFunction[IN, OUT] - with ResultTypeQueryable[OUT] - with Compiler[MapFunction[IN, OUT]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: MapFunction[IN, OUT] = null - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating MapFunction.") - function = clazz.newInstance() - } - - override def map(in: IN): OUT = - function.map(in) - - override def getProducedType: TypeInformation[OUT] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala deleted file mode 100644 index b355d49..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.codegen.Compiler -import org.apache.flink.configuration.Configuration -import org.slf4j.LoggerFactory - -abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT]( - name: String, - code: String, - @transient returnType: TypeInformation[OUT], - broadcastSetName: String) - extends RichFlatMapFunction[MULTI_IN, OUT] - with ResultTypeQueryable[OUT] - with Compiler[FlatJoinFunction[IN1, IN2, OUT]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - protected var function: FlatJoinFunction[IN1, IN2, OUT] = _ - protected var singleInput: SINGLE_IN = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating FlatJoinFunction.") - function = clazz.newInstance() - singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0) - } - - override def getProducedType: TypeInformation[OUT] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala deleted file mode 100644 index cac4fe6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import java.lang.Iterable - -import org.apache.flink.api.common.functions.CoGroupFunction -import org.apache.flink.util.Collector - -class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] { - override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = { - if (first == null || second == null) return - val leftIter = first.iterator - val rightIter = second.iterator - - if (all) { - while (rightIter.hasNext && leftIter.hasNext) { - leftIter.next() - rightIter.next() - } - - while (leftIter.hasNext) { - out.collect(leftIter.next()) - } - } else { - if (!rightIter.hasNext && leftIter.hasNext) { - out.collect(leftIter.next()) - } - } - } -} -