http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala deleted file mode 100644 index 65b97fb..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.operators - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.TypeConverter - -class DataSetTable[T]( - val dataSet: DataSet[T], - val fieldNames: Array[String]) extends AbstractTable { - - // check uniquenss of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new scala.IllegalArgumentException( - "Table field names must be unique.") - } - - val dataSetType: CompositeType[T] = - dataSet.getType match { - case cType: CompositeType[T] => - cType - case _ => - throw new scala.IllegalArgumentException( - "DataSet must have a composite type.") - } - - val fieldTypes: Array[SqlTypeName] = - if (fieldNames.length == dataSetType.getArity) { - (0 until dataSetType.getArity) - .map(i => dataSetType.getTypeAt(i)) - .map(TypeConverter.typeInfoToSqlType) - .toArray - } - else { - throw new IllegalArgumentException( - "Arity of DataSet type not equal to number of field names.") - } - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val builder = typeFactory.builder - fieldNames.zip(fieldTypes) - .foreach( f => builder.add(f._1, f._2).nullable(true) ) - builder.build - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala new file mode 100644 index 0000000..97e8b32 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules + +import org.apache.calcite.rel.rules._ +import org.apache.calcite.tools.{RuleSets, RuleSet} +import org.apache.flink.api.table.plan.rules.logical._ +import org.apache.flink.api.table.plan.rules.dataset._ + +object FlinkRuleSets { + + /** + * RuleSet to optimize plans for batch / DataSet exeuction + */ + val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( + + // filter rules + FilterJoinRule.FILTER_ON_JOIN, + FilterJoinRule.JOIN, + FilterMergeRule.INSTANCE, + FilterAggregateTransposeRule.INSTANCE, + + // push and merge projection rules + AggregateProjectMergeRule.INSTANCE, + ProjectMergeRule.INSTANCE, + ProjectFilterTransposeRule.INSTANCE, + FilterProjectTransposeRule.INSTANCE, + AggregateProjectPullUpConstantsRule.INSTANCE, + JoinPushExpressionsRule.INSTANCE, + ProjectJoinTransposeRule.INSTANCE, + ProjectRemoveRule.INSTANCE, + SortProjectTransposeRule.INSTANCE, + ProjectSortTransposeRule.INSTANCE, + + // merge and push unions rules + // TODO: Add a rule to enforce binary unions + UnionEliminatorRule.INSTANCE, + JoinUnionTransposeRule.LEFT_UNION, + JoinUnionTransposeRule.RIGHT_UNION, + // non-all Union to all-union + distinct + UnionToDistinctRule.INSTANCE, + + // aggregation rules + AggregateRemoveRule.INSTANCE, + AggregateJoinTransposeRule.EXTENDED, + AggregateUnionAggregateRule.INSTANCE, + AggregateReduceFunctionsRule.INSTANCE, + AggregateExpandDistinctAggregatesRule.INSTANCE, + + // remove unnecessary sort rule + SortRemoveRule.INSTANCE, + + // join reordering rules + JoinPushThroughJoinRule.LEFT, + JoinPushThroughJoinRule.RIGHT, + JoinAssociateRule.INSTANCE, + JoinCommuteRule.INSTANCE, + JoinCommuteRule.SWAP_OUTER, + + // simplify expressions rules + ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.JOIN_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, + + // prune empty results rules + PruneEmptyRules.AGGREGATE_INSTANCE, + PruneEmptyRules.FILTER_INSTANCE, + PruneEmptyRules.JOIN_LEFT_INSTANCE, + PruneEmptyRules.JOIN_RIGHT_INSTANCE, + PruneEmptyRules.PROJECT_INSTANCE, + PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.UNION_INSTANCE, + + // calc rules + FilterCalcMergeRule.INSTANCE, + ProjectCalcMergeRule.INSTANCE, + FilterToCalcRule.INSTANCE, + ProjectToCalcRule.INSTANCE, + CalcMergeRule.INSTANCE, + + // translate to logical Flink nodes + FlinkAggregateRule.INSTANCE, + FlinkCalcRule.INSTANCE, + FlinkFilterRule.INSTANCE, + FlinkJoinRule.INSTANCE, + FlinkProjectRule.INSTANCE, + FlinkScanRule.INSTANCE, + FlinkUnionRule.INSTANCE + ) + + val DATASET_TRANS_RULES: RuleSet = RuleSets.ofList( + + // translate to DataSet nodes + DataSetAggregateRule.INSTANCE, + DataSetCalcRule.INSTANCE, + DataSetFilterRule.INSTANCE, + DataSetJoinRule.INSTANCE, + DataSetProjectRule.INSTANCE, + DataSetScanRule.INSTANCE, + DataSetUnionRule.INSTANCE + ) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala new file mode 100644 index 0000000..1d17d63 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetReduce} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} + +class DataSetAggregateRule + extends ConverterRule( + classOf[FlinkAggregate], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetAggregateRule") +{ + + def convert(rel: RelNode): RelNode = { + val agg: FlinkAggregate = rel.asInstanceOf[FlinkAggregate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) + + new DataSetReduce( + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + agg.toString, + Array[Int](), + null) + } +} + +object DataSetAggregateRule { + val INSTANCE: RelOptRule = new DataSetAggregateRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala new file mode 100644 index 0000000..85e090c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, FlinkConvention} + +class DataSetCalcRule + extends ConverterRule( + classOf[FlinkCalc], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetCalcRule") +{ + + def convert(rel: RelNode): RelNode = { + val calc: FlinkCalc = rel.asInstanceOf[FlinkCalc] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) + + new DataSetFlatMap( + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + calc.toString, + null) + } +} + +object DataSetCalcRule { + val INSTANCE: RelOptRule = new DataSetCalcRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala new file mode 100644 index 0000000..383c965 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, FlinkConvention} + +class DataSetFilterRule + extends ConverterRule( + classOf[FlinkFilter], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetFilterRule") +{ + + def convert(rel: RelNode): RelNode = { + val filter: FlinkFilter = rel.asInstanceOf[FlinkFilter] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(filter.getInput, DataSetConvention.INSTANCE) + + new DataSetFlatMap( + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + filter.toString, + null) + } +} + +object DataSetFilterRule { + val INSTANCE: RelOptRule = new DataSetFilterRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala new file mode 100644 index 0000000..3d2117d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} + +class DataSetJoinRule + extends ConverterRule( + classOf[FlinkJoin], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetJoinRule") +{ + + def convert(rel: RelNode): RelNode = { + val join: FlinkJoin = rel.asInstanceOf[FlinkJoin] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) + + new DataSetJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + join.toString, + Array[Int](), + Array[Int](), + JoinType.INNER, + null, + null) + } +} + +object DataSetJoinRule { + val INSTANCE: RelOptRule = new DataSetJoinRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala new file mode 100644 index 0000000..7796d66 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMap} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, FlinkConvention} + +class DataSetProjectRule + extends ConverterRule( + classOf[FlinkProject], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetProjectRule") +{ + + def convert(rel: RelNode): RelNode = { + val proj: FlinkProject = rel.asInstanceOf[FlinkProject] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(proj.getInput, DataSetConvention.INSTANCE) + + new DataSetMap( + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + proj.toString, + null) + } +} + +object DataSetProjectRule { + val INSTANCE: RelOptRule = new DataSetProjectRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala new file mode 100644 index 0000000..937f3e2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, FlinkConvention} +import org.apache.flink.api.table.plan.schema.DataSetTable + +class DataSetScanRule + extends ConverterRule( + classOf[FlinkScan], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetScanRule") +{ + def convert(rel: RelNode): RelNode = { + val scan: FlinkScan = rel.asInstanceOf[FlinkScan] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val dataSet: DataSet[_] = scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet + + new DataSetSource( + rel.getCluster, + traitSet, + scan.getTable, + rel.getRowType, + dataSet + ) + } +} + +object DataSetScanRule { + val INSTANCE: RelOptRule = new DataSetScanRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala new file mode 100644 index 0000000..a390374 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataset + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, FlinkConvention} + +class DataSetUnionRule + extends ConverterRule( + classOf[FlinkUnion], + FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, + "DataSetUnionRule") +{ + + def convert(rel: RelNode): RelNode = { + val union: FlinkUnion = rel.asInstanceOf[FlinkUnion] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) + + new DataSetUnion( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + union.toString) + } +} + +object DataSetUnionRule { + val INSTANCE: RelOptRule = new DataSetUnionRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala new file mode 100644 index 0000000..a5bfeb6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} + +class FlinkAggregateRule + extends ConverterRule( + classOf[LogicalAggregate], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkAggregateRule") + { + + def convert(rel: RelNode): RelNode = { + val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConvention.INSTANCE) + + new FlinkAggregate( + rel.getCluster, + traitSet, + convInput, + agg.indicator, + agg.getGroupSet, + agg.getGroupSets, + agg.getAggCallList) + } + } + +object FlinkAggregateRule { + val INSTANCE: RelOptRule = new FlinkAggregateRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala new file mode 100644 index 0000000..f40b04d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalCalc +import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, FlinkConvention} + +class FlinkCalcRule + extends ConverterRule( + classOf[LogicalCalc], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkCalcRule") + { + + def convert(rel: RelNode): RelNode = { + val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calc.getInput, FlinkConvention.INSTANCE) + + new FlinkCalc( + rel.getCluster, + traitSet, + convInput, + calc.getProgram) + } + } + +object FlinkCalcRule { + val INSTANCE: RelOptRule = new FlinkCalcRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala new file mode 100644 index 0000000..25df8e8 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalFilter +import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, FlinkConvention} + +class FlinkFilterRule + extends ConverterRule( + classOf[LogicalFilter], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkFilterRule") + { + + def convert(rel: RelNode): RelNode = { + val filter: LogicalFilter = rel.asInstanceOf[LogicalFilter] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(filter.getInput, FlinkConvention.INSTANCE) + + new FlinkFilter( + rel.getCluster, + traitSet, + convInput, + filter.getCondition) + } + } + +object FlinkFilterRule { + val INSTANCE: RelOptRule = new FlinkFilterRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala new file mode 100644 index 0000000..3826c9a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} + +class FlinkJoinRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkJoinRule") + { + + def convert(rel: RelNode): RelNode = { + val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConvention.INSTANCE) + + new FlinkJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + join.getCondition, + join.getJoinType, + join.getVariablesStopped) + } + } + +object FlinkJoinRule { + val INSTANCE: RelOptRule = new FlinkJoinRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala new file mode 100644 index 0000000..d0e1410 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, FlinkConvention} + +class FlinkProjectRule + extends ConverterRule( + classOf[LogicalProject], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkProjectRule") + { + + def convert(rel: RelNode): RelNode = { + val proj: LogicalProject = rel.asInstanceOf[LogicalProject] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(proj.getInput, FlinkConvention.INSTANCE) + + new FlinkProject( + rel.getCluster, + traitSet, + convInput, + proj.getProjects, + proj.getRowType) + } + } + +object FlinkProjectRule { + val INSTANCE: RelOptRule = new FlinkProjectRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala new file mode 100644 index 0000000..d789770 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, FlinkConvention} +import org.apache.flink.api.table.plan.schema.DataSetTable + +class FlinkScanRule + extends ConverterRule( + classOf[LogicalTableScan], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkScanRule") + { + def convert(rel: RelNode): RelNode = { + val scan: TableScan = rel.asInstanceOf[TableScan] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val dataSet: DataSet[_] = scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet + + new FlinkScan( + rel.getCluster, + traitSet, + scan.getTable + ) + } + } + +object FlinkScanRule { + val INSTANCE: RelOptRule = new FlinkScanRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala new file mode 100644 index 0000000..d9869f8 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.logical + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalUnion +import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, FlinkConvention} + +import scala.collection.JavaConversions._ + +class FlinkUnionRule + extends ConverterRule( + classOf[LogicalUnion], + Convention.NONE, + FlinkConvention.INSTANCE, + "FlinkUnionRule") + { + + def convert(rel: RelNode): RelNode = { + val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) + val convInputs = union.getInputs.toList.map( + RelOptRule.convert(_, FlinkConvention.INSTANCE) + ) + + new FlinkUnion( + rel.getCluster, + traitSet, + convInputs, + union.all) + } + } + +object FlinkUnionRule { + val INSTANCE: RelOptRule = new FlinkUnionRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala new file mode 100644 index 0000000..e6aecab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.rel.{RelCollation, RelDistribution} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.plan.TypeConverter + +class DataSetTable[T]( + val dataSet: DataSet[T], + val fieldNames: Array[String]) + extends AbstractTable { + + // check uniquenss of field names + if (fieldNames.length != fieldNames.toSet.size) { + throw new scala.IllegalArgumentException( + "Table field names must be unique.") + } + + val dataSetType: CompositeType[T] = + dataSet.getType match { + case cType: CompositeType[T] => + cType + case _ => + throw new scala.IllegalArgumentException( + "DataSet must have a composite type.") + } + + val fieldTypes: Array[SqlTypeName] = + if (fieldNames.length == dataSetType.getArity) { + (0 until dataSetType.getArity) + .map(i => dataSetType.getTypeAt(i)) + .map(TypeConverter.typeInfoToSqlType) + .toArray + } + else { + throw new IllegalArgumentException( + "Arity of DataSet type not equal to number of field names.") + } + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val builder = typeFactory.builder + fieldNames.zip(fieldTypes) + .foreach( f => builder.add(f._1, f._2).nullable(true) ) + builder.build + } + +// override def getStatistic: Statistic = { +// new DefaultDataSetStatistic +// } + +} + +class DefaultDataSetStatistic extends Statistic { + + override def getRowCount: Double = 1000d + + override def getCollations: util.List[RelCollation] = Collections.emptyList() + + override def isKey(columns: ImmutableBitSet): Boolean = false + + override def getDistribution: RelDistribution = null +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 271aa99..138cd70 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -270,7 +270,6 @@ class Table( relBuilder.join(JoinRelType.INNER, relBuilder.literal(true)) val join = relBuilder.build() - val rowT = join.getRowType() new Table(join, relBuilder) } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 447acad..a3d5edc 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -35,7 +35,9 @@ package org.apache.flink.api.java.table.test; * limitations under the License. */ +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; @@ -48,6 +50,9 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; + +import java.util.List; @RunWith(Parameterized.class) public class AggregationsITCase extends MultipleProgramsTestBase { @@ -57,7 +62,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testAggregationTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -66,10 +71,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "231,1,21,21,11"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "231,1,21,21,11"; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -83,13 +88,13 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("foo.avg"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testWorkingAggregationDataTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -105,13 +110,13 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1,1,1,1,1.5,1.5,2"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,1,1,1.5,1.5,2"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testAggregationWithArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -128,13 +133,13 @@ public class AggregationsITCase extends MultipleProgramsTestBase { table.select("(f0 + 2).avg + 2, f1.count + 5"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "5.5,7"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "5.5,7"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testAggregationWithTwoCount() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -150,11 +155,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("f0.count, f1.count"); - -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2,2"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2"; + compareResultAsText(results, expected); } // Calcite does not eagerly check type compatibility @@ -172,11 +176,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("f1.sum"); - -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = ExpressionException.class) @@ -192,11 +195,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { Table result = table.select("f0.sum.sum"); - -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index f706b48..f257c32 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.table.test; -import org.apache.flink.api.table.ExpressionException; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; import org.apache.flink.api.java.DataSet; @@ -29,6 +28,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -40,7 +40,7 @@ public class AsITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testAs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -48,15 +48,15 @@ public class AsITCase extends MultipleProgramsTestBase { Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + -// "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + -// "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + -// "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + -// "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + -// "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -67,10 +67,10 @@ public class AsITCase extends MultipleProgramsTestBase { Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -81,10 +81,10 @@ public class AsITCase extends MultipleProgramsTestBase { Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -95,10 +95,10 @@ public class AsITCase extends MultipleProgramsTestBase { Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -109,10 +109,10 @@ public class AsITCase extends MultipleProgramsTestBase { Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -124,10 +124,10 @@ public class AsITCase extends MultipleProgramsTestBase { tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + " c"); -// DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java index 5e99fd9..cde78ce 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -43,7 +44,7 @@ public class CastingITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testNumericAutocastInArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -57,13 +58,13 @@ public class CastingITCase extends MultipleProgramsTestBase { Table result = table.select("f0 + 1, f1 +" + " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2,2,2,2.0,2.0,2.0"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,2,2.0,2.0,2.0"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testNumericAutocastInComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -79,13 +80,13 @@ public class CastingITCase extends MultipleProgramsTestBase { Table result = table .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2,2,2,2,2.0,2.0,Hello"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,2,2,2.0,2.0,Hello"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testCastFromString() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -99,13 +100,13 @@ public class CastingITCase extends MultipleProgramsTestBase { Table result = table.select( "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1,1,1,1,2.0,2.0,true\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,1,1,2.0,2.0,true\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testCastDateFromString() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -120,14 +121,14 @@ public class CastingITCase extends MultipleProgramsTestBase { .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3") .select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + -// "1970-01-17 17:47:53.775\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + + "1970-01-17 17:47:53.775\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testCastDateToStringAndLong() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -142,10 +143,10 @@ public class CastingITCase extends MultipleProgramsTestBase { .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1") .select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n"; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 3bbc120..2a17087 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.table.test; -import org.apache.flink.api.table.ExpressionException; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; import org.apache.flink.api.java.DataSet; @@ -31,6 +30,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -42,7 +42,7 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -56,13 +56,13 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { Table result = table.select( "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "0,10,2,10,1,-5"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "0,10,2,10,1,-5"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testLogic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -76,13 +76,13 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { Table result = table.select( "b && true, b && false, b || false, !b"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "true,false,true,false"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "true,false,true,false"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testComparisons() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -96,10 +96,10 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { Table result = table.select( "a > c, a >= b, a < c, a.isNull, a.isNotNull"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "true,true,false,false,true"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "true,true,false,false,true"; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index 68925fb..a3ab10f 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -40,7 +41,7 @@ public class FilterITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testAllRejectingFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -53,13 +54,13 @@ public class FilterITCase extends MultipleProgramsTestBase { Table result = table .filter("false"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testAllPassingFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -72,18 +73,18 @@ public class FilterITCase extends MultipleProgramsTestBase { Table result = table .filter("true"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + -// "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + -// "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + -// "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + -// "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + -// "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testFilterOnIntegerTupleField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -96,15 +97,15 @@ public class FilterITCase extends MultipleProgramsTestBase { Table result = table .filter(" a % 2 = 0 "); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + -// "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + -// "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testNotEquals() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -117,15 +118,15 @@ public class FilterITCase extends MultipleProgramsTestBase { Table result = table .filter("!( a % 2 <> 0 ) "); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + -// "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + -// "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testIntegerBiggerThan128() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -136,10 +137,10 @@ public class FilterITCase extends MultipleProgramsTestBase { Table result = table.filter("a = 300 "); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "300,1,Hello\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "300,1,Hello\n"; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java index 2add694..d5dc56a 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.table.test; -import org.apache.flink.api.table.ExpressionException; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; import org.apache.flink.api.java.DataSet; @@ -30,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -54,13 +54,13 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { Table result = table .groupBy("foo").select("a.avg"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testGroupedAggregate() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -73,13 +73,13 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { Table result = table .groupBy("b").select("b, a.sum"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testGroupingKeyForwardIfNotUsed() throws Exception { // the grouping key needs to be forwarded to the intermediate DataSet, even @@ -96,13 +96,13 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { Table result = table .groupBy("b").select("a.sum"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testGroupNoAggregation() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -116,10 +116,10 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { Table result = table .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; -// List<Row> results = ds.collect(); -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; + List<Row> results = ds.collect(); + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java index 2b44a87..2e1f4a7 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.table.test; +import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -30,6 +31,9 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; + +import java.util.List; @RunWith(Parameterized.class) @@ -40,7 +44,7 @@ public class JoinITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testJoin() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -53,13 +57,13 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1.join(in2).where("b === e").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testJoinWithFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -72,13 +76,13 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1.join(in2).where("b === e && b < 2").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "Hi,Hallo\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testJoinWithMultipleKeys() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -91,11 +95,11 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1.join(in2).where("a === d && b === h").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + -// "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -111,10 +115,10 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1.join(in2).where("foo === e").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } // Calcite does not eagerly check the compatibility of compared types @@ -133,10 +137,10 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1 .join(in2).where("a === g").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } @Test(expected = IllegalArgumentException.class) @@ -153,13 +157,13 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1 .join(in2).where("a === d").select("c, g"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = ""; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testJoinWithAggregation() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -173,10 +177,10 @@ public class JoinITCase extends MultipleProgramsTestBase { Table result = in1 .join(in2).where("a === d").select("g.count"); -// DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); -// List<Row> results = ds.collect(); -// String expected = "6"; -// compareResultAsText(results, expected); + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "6"; + compareResultAsText(results, expected); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java index f19b8c1..5ef0235 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; @RunWith(Parameterized.class) public class PojoGroupingITCase extends MultipleProgramsTestBase { @@ -38,7 +39,7 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testPojoGrouping() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -54,14 +55,14 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase { .select("groupMe, value, name") .where("groupMe != 'B'"); -// DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); -// -// DataSet<MyPojo> result = myPojos.groupBy("groupMe") -// .sortGroup("value", Order.DESCENDING) -// .first(1); -// -// List<MyPojo> resultList = result.collect(); -// compareResultAsText(resultList, "A,24.0,Y"); + DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); + + DataSet<MyPojo> result = myPojos.groupBy("groupMe") + .sortGroup("value", Order.DESCENDING) + .first(1); + + List<MyPojo> resultList = result.collect(); + compareResultAsText(resultList, "A,24.0,Y"); } public static class MyPojo implements Serializable {