[FLINK-3225] Implemented optimization of Table API queries via Calcite - added logical Flink nodes and translation rules - added stubs for DataSet translation rules - ported DataSetNodes to Scala - reactivated tests and added expected NotImplementedError
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ecb7010 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ecb7010 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ecb7010 Branch: refs/heads/tableOnCalcite Commit: 7ecb70105254052514494d2aae482f335f7c6835 Parents: 364928e Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Jan 26 13:22:38 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jan 29 22:29:12 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/api/table/package-info.java | 33 ----- .../api/table/sql/calcite/DataSetRelNode.java | 29 ----- .../table/sql/calcite/node/DataSetExchange.java | 60 ---------- .../table/sql/calcite/node/DataSetFlatMap.java | 56 --------- .../api/table/sql/calcite/node/DataSetJoin.java | 80 ------------- .../api/table/sql/calcite/node/DataSetMap.java | 58 --------- .../table/sql/calcite/node/DataSetReduce.java | 58 --------- .../sql/calcite/node/DataSetReduceGroup.java | 62 ---------- .../api/table/sql/calcite/node/DataSetSort.java | 59 --------- .../table/sql/calcite/node/DataSetSource.java | 55 --------- .../table/sql/calcite/node/DataSetUnion.java | 51 -------- .../api/java/table/JavaBatchTranslator.scala | 68 +++++++---- .../api/table/plan/TranslationContext.scala | 79 ++++++++++++ .../plan/nodes/dataset/DataSetConvention.scala | 42 +++++++ .../plan/nodes/dataset/DataSetExchange.scala | 63 ++++++++++ .../plan/nodes/dataset/DataSetFlatMap.scala | 62 ++++++++++ .../plan/nodes/dataset/DataSetGroupReduce.scala | 63 ++++++++++ .../table/plan/nodes/dataset/DataSetJoin.scala | 73 +++++++++++ .../table/plan/nodes/dataset/DataSetMap.scala | 63 ++++++++++ .../plan/nodes/dataset/DataSetReduce.scala | 63 ++++++++++ .../table/plan/nodes/dataset/DataSetRel.scala | 33 +++++ .../table/plan/nodes/dataset/DataSetSort.scala | 62 ++++++++++ .../plan/nodes/dataset/DataSetSource.scala | 55 +++++++++ .../table/plan/nodes/dataset/DataSetUnion.scala | 62 ++++++++++ .../plan/nodes/logical/FlinkAggregate.scala | 76 ++++++++++++ .../table/plan/nodes/logical/FlinkCalc.scala | 37 ++++++ .../plan/nodes/logical/FlinkConvention.scala | 42 +++++++ .../table/plan/nodes/logical/FlinkFilter.scala | 42 +++++++ .../table/plan/nodes/logical/FlinkJoin.scala | 46 +++++++ .../table/plan/nodes/logical/FlinkProject.scala | 45 +++++++ .../api/table/plan/nodes/logical/FlinkRel.scala | 25 ++++ .../table/plan/nodes/logical/FlinkScan.scala | 31 +++++ .../table/plan/nodes/logical/FlinkUnion.scala | 38 ++++++ .../api/table/plan/operators/DataSetTable.scala | 66 ---------- .../api/table/plan/rules/FlinkRuleSets.scala | 120 +++++++++++++++++++ .../rules/dataset/DataSetAggregateRule.scala | 53 ++++++++ .../plan/rules/dataset/DataSetCalcRule.scala | 52 ++++++++ .../plan/rules/dataset/DataSetFilterRule.scala | 52 ++++++++ .../plan/rules/dataset/DataSetJoinRule.scala | 59 +++++++++ .../plan/rules/dataset/DataSetProjectRule.scala | 52 ++++++++ .../plan/rules/dataset/DataSetScanRule.scala | 53 ++++++++ .../plan/rules/dataset/DataSetUnionRule.scala | 53 ++++++++ .../plan/rules/logical/FlinkAggregateRule.scala | 53 ++++++++ .../plan/rules/logical/FlinkCalcRule.scala | 50 ++++++++ .../plan/rules/logical/FlinkFilterRule.scala | 50 ++++++++ .../plan/rules/logical/FlinkJoinRule.scala | 54 +++++++++ .../plan/rules/logical/FlinkProjectRule.scala | 51 ++++++++ .../plan/rules/logical/FlinkScanRule.scala | 53 ++++++++ .../plan/rules/logical/FlinkUnionRule.scala | 54 +++++++++ .../api/table/plan/schema/DataSetTable.scala | 89 ++++++++++++++ .../org/apache/flink/api/table/table.scala | 1 - .../api/java/table/test/AggregationsITCase.java | 72 +++++------ .../flink/api/java/table/test/AsITCase.java | 62 +++++----- .../api/java/table/test/CastingITCase.java | 53 ++++---- .../api/java/table/test/ExpressionsITCase.java | 32 ++--- .../flink/api/java/table/test/FilterITCase.java | 69 +++++------ .../table/test/GroupedAggregationsITCase.java | 40 +++---- .../flink/api/java/table/test/JoinITCase.java | 70 ++++++----- .../api/java/table/test/PojoGroupingITCase.java | 19 +-- .../flink/api/java/table/test/SelectITCase.java | 68 +++++------ .../table/test/StringExpressionsITCase.java | 40 ++++--- .../flink/api/java/table/test/UnionITCase.java | 47 ++++---- .../scala/table/test/AggregationsITCase.scala | 59 +++++---- .../flink/api/scala/table/test/AsITCase.scala | 50 ++++---- .../api/scala/table/test/CastingITCase.scala | 44 +++---- .../scala/table/test/ExpressionsITCase.scala | 40 +++---- .../table/test/GroupedAggregationsITCase.scala | 33 +++-- .../flink/api/scala/table/test/JoinITCase.scala | 54 ++++----- .../api/scala/table/test/SelectITCase.scala | 72 +++++------ .../table/test/StringExpressionsITCase.scala | 28 ++--- .../api/scala/table/test/UnionITCase.scala | 36 +++--- 71 files changed, 2596 insertions(+), 1178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java deleted file mode 100644 index d7fbc8e..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <strong>Table API</strong><br> - * - * This package contains the generic part of the Table API. It can be used with Flink Streaming - * and Flink Batch. From Scala as well as from Java. - * - * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from - * a DataSet or DataStream. On this relational operations can be performed. A table can also - * be converted back to a DataSet or DataStream. - * - * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain - * the language specific part of the API. Refer to these packages for documentation on how - * the Table API can be used in Java and Scala. - */ -package org.apache.flink.api.table; http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java deleted file mode 100644 index df0ebc0..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite; - -import org.apache.calcite.rel.RelNode; -import org.apache.flink.api.java.DataSet; - -public interface DataSetRelNode<T> extends RelNode { - - /** - * Translate the FlinkRelNode into Flink operator. - */ - DataSet<T> translateToPlan(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java deleted file mode 100644 index 1ddd884..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with PartitionOperator. - * - * @param <T> - */ -public class DataSetExchange<T> extends SingleRel implements DataSetRelNode<T> { - - public DataSetExchange(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - private PartitionMethod getPartitionMethod() { - return null; - } - - private int[] getPartitionKey() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java deleted file mode 100644 index cf597b2..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with FlatMapOperator. - * - * @param <T> - */ -public class DataSetFlatMap<T> extends SingleRel implements DataSetRelNode<T> { - - protected DataSetFlatMap(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - private RichFlatMapFunction<T, T> getFlatMapFunction() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java deleted file mode 100644 index 3d1146e..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.BiRel; -import org.apache.calcite.rel.RelNode; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.join.JoinType; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with JoinOperator and its related operations. - */ -public class DataSetJoin<L, R, OUT> extends BiRel implements DataSetRelNode<OUT> { - - public DataSetJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right) { - super(cluster, traitSet, left, right); - } - - private TypeInformation<L> getLeftInputType() { - return null; - } - - private TypeInformation<R> getRightInputType() { - return null; - } - - private TypeInformation<OUT> getType() { - return null; - } - - private String getName() { - return null; - } - - private JoinType getJoinType() { - return null; - } - - private JoinHint getJoinHint() { - return null; - } - - private int[] getLeftJoinKey() { - return null; - } - - private int[] getRightJoinKey() { - return null; - } - - private JoinFunction<L, R, OUT> getJoinFunction() { - return null; - } - - @Override - public DataSet<OUT> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java deleted file mode 100644 index 3098cb3..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with MapOperator. - */ -public class DataSetMap<IN, OUT> extends SingleRel implements DataSetRelNode<OUT> { - - protected DataSetMap(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<IN> getInputType() { - return null; - } - - private TypeInformation<OUT> getType() { - return null; - } - - private String getName() { - return null; - } - - private RichMapFunction<IN, OUT> getMapFunction() { - return null; - } - - @Override - public DataSet<OUT> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java deleted file mode 100644 index 4aa2846..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with ReduceOperator. - */ -public class DataSetReduce<T> extends SingleRel implements DataSetRelNode<T> { - - public DataSetReduce(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - private RichReduceFunction<T> getReduceFunction() { - return null; - } - - private int[] getGroupingKeys() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java deleted file mode 100644 index 5467c56..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with ReduceGroupOperator. - */ -public class DataSetReduceGroup<IN, OUT> extends SingleRel implements DataSetRelNode<OUT> { - - public DataSetReduceGroup(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<IN> getInputType() { - return null; - } - - private TypeInformation<OUT> getType() { - return null; - } - - private String getName() { - return null; - } - - private RichGroupReduceFunction<IN, OUT> getGroupReduceFunction() { - return null; - } - - private int[] getGroupingKeys() { - return null; - } - - @Override - public DataSet<OUT> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java deleted file mode 100644 index 7fa1c53..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with SortPartitionOperator. - * - * @param <T> - */ -public class DataSetSort<T> extends SingleRel implements DataSetRelNode<T> { - - public DataSetSort(RelOptCluster cluster, RelTraitSet traits, RelNode input) { - super(cluster, traits, input); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - private int[] getSortKey() { - return null; - } - - private boolean[] getSortKeyOrder() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java deleted file mode 100644 index f24ee79..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.TableScan; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with DataSource. - * - * @param <T> - */ -public class DataSetSource<T> extends TableScan implements DataSetRelNode<T> { - - public DataSetSource(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { - super(cluster, traitSet, table); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - private DataSet<T> getDatSource() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java deleted file mode 100644 index 8b435e9..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.sql.calcite.node; - -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.BiRel; -import org.apache.calcite.rel.RelNode; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.table.sql.calcite.DataSetRelNode; - -/** - * Flink RelNode which matches along with UnionOperator. - * - * @param <T> - */ -public class DataSetUnion<T> extends BiRel implements DataSetRelNode<T> { - - public DataSetUnion(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right) { - super(cluster, traitSet, left, right); - } - - private TypeInformation<T> getType() { - return null; - } - - private String getName() { - return null; - } - - @Override - public DataSet<T> translateToPlan() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index f3f4e9d..66bfbe7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -18,14 +18,17 @@ package org.apache.flink.api.java.table -import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} import org.apache.calcite.rel.RelNode -import org.apache.calcite.tools.{RelBuilder, Frameworks} +import org.apache.calcite.sql2rel.RelDecorrelator +import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.plan.operators.DataSetTable import org.apache.flink.api.table.Table +import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel +import org.apache.flink.api.table.plan.rules.FlinkRuleSets +import org.apache.flink.api.table.plan.schema.DataSetTable /** * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and @@ -41,37 +44,58 @@ class JavaBatchTranslator extends PlanTranslator { // create table representation from DataSet val dataSetTable = new DataSetTable[A]( - repr.asInstanceOf[JavaDataSet[A]], - fieldNames + repr.asInstanceOf[JavaDataSet[A]], + fieldNames ) - // register table in Cascading schema - val schema = Frameworks.createRootSchema(true) - val tableName = repr.hashCode().toString - schema.add(tableName, dataSetTable) - - // initialize RelBuilder - val frameworkConfig = Frameworks - .newConfigBuilder - .defaultSchema(schema) - .build - val relBuilder = RelBuilder.create(frameworkConfig) + val tabName = TranslationContext.addDataSet(dataSetTable) + val relBuilder = TranslationContext.getRelBuilder // create table scan operator - relBuilder.scan(tableName) + relBuilder.scan(tabName) new Table(relBuilder.build(), relBuilder) } override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { + // get the planner for the plan + val planner = lPlan.getCluster.getPlanner + + // we do not have any special requirements for the output + val outputProps = RelTraitSet.createEmpty() + + println("-----------") + println("Input Plan:") + println("-----------") println(RelOptUtil.toString(lPlan)) - // TODO: optimize & translate: - // - optimize RelNode plan - // - translate to Flink RelNode plan - // - generate DataSet program + // decorrelate + val decorPlan = RelDecorrelator.decorrelateQuery(lPlan) + + // optimize the logical Flink plan + val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) + val optPlan = optProgram.run(planner, decorPlan, outputProps) + + println("---------------") + println("Optimized Plan:") + println("---------------") + println(RelOptUtil.toString(optPlan)) + + // optimize the logical Flink plan + val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES) + val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps) + + println("-------------") + println("DataSet Plan:") + println("-------------") + println(RelOptUtil.toString(dataSetPlan)) + + dataSetPlan match { + case node: DataSetRel => + node.translateToPlan.asInstanceOf[JavaDataSet[A]] + case _ => ??? + } - null } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala new file mode 100644 index 0000000..b2b0c2b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.calcite.plan.ConventionTraitDef +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.tools.{Frameworks, RelBuilder} +import org.apache.flink.api.table.plan.schema.DataSetTable + +object TranslationContext { + + private var relBuilder: RelBuilder = null + private var tables: SchemaPlus = null + private var tabNames: Map[AbstractTable, String] = null + private val nameCntr: AtomicInteger = new AtomicInteger(0) + + reset() + + def reset(): Unit = { + + // register table in Cascading schema + tables = Frameworks.createRootSchema(true) + + // initialize RelBuilder + val frameworkConfig = Frameworks + .newConfigBuilder + .defaultSchema(tables) + .traitDefs(ConventionTraitDef.INSTANCE) + .build + + tabNames = Map[AbstractTable, String]() + + relBuilder = RelBuilder.create(frameworkConfig) + + } + + def addDataSet(newTable: DataSetTable[_]): String = { + + // look up name + val tabName = tabNames.get(newTable) + + tabName match { + case Some(name) => + name + case None => + val tabName = "DataSetTable_" + nameCntr.getAndIncrement() + tabNames += (newTable -> tabName) + tables.add(tabName, newTable) + tabName + } + + } + + def getRelBuilder: RelBuilder = { + relBuilder + } + +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala new file mode 100644 index 0000000..cbacd16 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ + +class DataSetConvention extends Convention { + + override def toString: String = getName + + def getInterface: Class[_] = classOf[DataSetRel] + + def getName: String = "DATASET" + + def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE + + def satisfies(`trait`: RelTrait): Boolean = this eq `trait` + + def register(planner: RelOptPlanner): Unit = { } + +} + +object DataSetConvention { + + val INSTANCE = new DataSetConvention +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala new file mode 100644 index 0000000..ec5805a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with PartitionOperator. + */ +class DataSetExchange( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + partitionKey: Array[Int], + partitionMethod: PartitionMethod) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetExchange( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + partitionKey, + partitionMethod + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala new file mode 100644 index 0000000..913cca0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with FlatMapOperator. + * + */ +class DataSetFlatMap( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + func: FlatMapFunction[Row, Row]) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetFlatMap( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + func + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala new file mode 100644 index 0000000..11bb160 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with ReduceGroupOperator. + */ +class DataSetGroupReduce( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + groupingKeys: Array[Int], + func: GroupReduceFunction[Row, Row]) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetGroupReduce( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + groupingKeys, + func + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala new file mode 100644 index 0000000..c20cdc5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} +import org.apache.flink.api.common.functions.JoinFunction +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with JoinOperator and its related operations. + */ +class DataSetJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + rowType: RelDataType, + opName: String, + joinKeysLeft: Array[Int], + joinKeysRight: Array[Int], + joinType: JoinType, + joinHint: JoinHint, + func: JoinFunction[Row, Row, Row]) + extends BiRel(cluster, traitSet, left, right) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + opName, + joinKeysLeft, + joinKeysRight, + joinType, + joinHint, + func + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala new file mode 100644 index 0000000..be8bd9d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with MapOperator. + */ +class DataSetMap( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + func: MapFunction[Row, Row]) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetMap( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + func + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def toString() = opName + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala new file mode 100644 index 0000000..567a91c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with ReduceOperator. + */ +class DataSetReduce( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + groupingKeys: Array[Int], + func: ReduceFunction[Row]) + extends SingleRel(cluster, traits, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetReduce( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + groupingKeys, + func + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala new file mode 100644 index 0000000..20677b3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.rel.RelNode +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +trait DataSetRel extends RelNode { + + /** + * Translate the FlinkRelNode into Flink operator. + */ + def translateToPlan: DataSet[Any] + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala new file mode 100644 index 0000000..df5301d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with SortPartitionOperator. + */ +class DataSetSort( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + opName: String, + sortKey: Array[Int], + sortOrder: Array[Boolean]) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetSort( + cluster, + traitSet, + inputs.get(0), + rowType, + opName, + sortKey, + sortOrder + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala new file mode 100644 index 0000000..effaf1a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelWriter, RelNode} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** + * Flink RelNode which matches along with DataSource. + */ +class DataSetSource( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + rowType: RelDataType, + inputDataSet: DataSet[_]) + extends TableScan(cluster, traitSet, table) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetSource( + cluster, + traitSet, + table, + rowType, + inputDataSet + ) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala new file mode 100644 index 0000000..a510fc9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row + +/** +* Flink RelNode which matches along with UnionOperator. +* +*/ +class DataSetUnion( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + rowType: RelDataType, + opName: String) + extends BiRel(cluster, traitSet, left, right) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetUnion( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + opName + ) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan: DataSet[Any] = { + ??? + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala new file mode 100644 index 0000000..f66cb71 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import java.util + +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{AggregateCall, Aggregate} +import org.apache.calcite.sql.fun.SqlAvgAggFunction +import org.apache.calcite.util.ImmutableBitSet + +import scala.collection.JavaConversions._ + +class FlinkAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: java.util.List[ImmutableBitSet], + aggCalls: java.util.List[AggregateCall]) + extends Aggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + with FlinkRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: util.List[ImmutableBitSet], + aggCalls: util.List[AggregateCall]): Aggregate = { + + new FlinkAggregate( + cluster, + traitSet, + input, + indicator, + groupSet, + groupSets, + aggCalls + ) + } + + override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + + val origCosts = super.computeSelfCost(planner) + val deltaCost = planner.getCostFactory.makeHugeCost() + + // only prefer aggregations with transformed Avg + aggCalls.toList.foldLeft[RelOptCost](origCosts){ + (c: RelOptCost, a: AggregateCall) => + if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { + c.plus(deltaCost) + } else { + c + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala new file mode 100644 index 0000000..bcfe8d7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram + +class FlinkCalc( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + program: RexProgram) + extends Calc(cluster, traitSet, input, program) + with FlinkRel { + + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { + new FlinkCalc(cluster, traitSet, child, program) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala new file mode 100644 index 0000000..80137f2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.plan._ + +class FlinkConvention extends Convention { + + override def toString: String = getName + + def getInterface: Class[_] = classOf[FlinkRel] + + def getName: String = "FLINK" + + def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE + + def satisfies(`trait`: RelTrait): Boolean = this eq `trait` + + def register(planner: RelOptPlanner): Unit = { } + +} + +object FlinkConvention { + + val INSTANCE = new FlinkConvention +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala new file mode 100644 index 0000000..9f0bf30 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.Filter +import org.apache.calcite.rex.RexNode + +class FlinkFilter( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + condition: RexNode) + extends Filter(cluster, traitSet, input, condition) + with FlinkRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, condition: RexNode): Filter = { + new FlinkFilter( + cluster, + traitSet, + input, + condition + ) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala new file mode 100644 index 0000000..8b04b50 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{JoinRelType, Join} +import org.apache.calcite.rex.RexNode + +class FlinkJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType, + variablesStopped: java.util.Set[String]) + extends Join(cluster, traitSet, left, right, condition, joinType, variablesStopped) + with FlinkRel { + + override def copy( + traitSet: RelTraitSet, + condition: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + new FlinkJoin(cluster, traitSet, left, right, condition, joinType, getVariablesStopped) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala new file mode 100644 index 0000000..1d93036 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rex.RexNode + +class FlinkProject( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + projects: java.util.List[RexNode], + rowType: RelDataType) + extends Project(cluster, traitSet, input, projects, rowType) + with FlinkRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + projects: util.List[RexNode], + rowType: RelDataType): Project = { + new FlinkProject(cluster, traitSet, input, projects, rowType) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala new file mode 100644 index 0000000..9ebd7e4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.rel.RelNode + +trait FlinkRel extends RelNode { + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala new file mode 100644 index 0000000..6d53a75 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import org.apache.calcite.plan.{RelOptTable, RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.core.TableScan + +class FlinkScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable) + extends TableScan(cluster, traitSet, table) + with FlinkRel { + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala new file mode 100644 index 0000000..fd791d3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.logical + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{SetOp, Union} + +class FlinkUnion( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputs: java.util.List[RelNode], + all: Boolean) + extends Union(cluster, traitSet, inputs, all) + with FlinkRel { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: Boolean): SetOp = { + new FlinkUnion(cluster, traitSet, inputs, all) + } +}