[FLINK-3225] Implemented optimization of Table API queries via Calcite

- added logical Flink nodes and translation rules
- added stubs for DataSet translation rules
- ported DataSetNodes to Scala
- reactivated tests and added expected NotImplementedError


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ecb7010
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ecb7010
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ecb7010

Branch: refs/heads/tableOnCalcite
Commit: 7ecb70105254052514494d2aae482f335f7c6835
Parents: 364928e
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Jan 26 13:22:38 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Jan 29 22:29:12 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/api/table/package-info.java    |  33 -----
 .../api/table/sql/calcite/DataSetRelNode.java   |  29 -----
 .../table/sql/calcite/node/DataSetExchange.java |  60 ----------
 .../table/sql/calcite/node/DataSetFlatMap.java  |  56 ---------
 .../api/table/sql/calcite/node/DataSetJoin.java |  80 -------------
 .../api/table/sql/calcite/node/DataSetMap.java  |  58 ---------
 .../table/sql/calcite/node/DataSetReduce.java   |  58 ---------
 .../sql/calcite/node/DataSetReduceGroup.java    |  62 ----------
 .../api/table/sql/calcite/node/DataSetSort.java |  59 ---------
 .../table/sql/calcite/node/DataSetSource.java   |  55 ---------
 .../table/sql/calcite/node/DataSetUnion.java    |  51 --------
 .../api/java/table/JavaBatchTranslator.scala    |  68 +++++++----
 .../api/table/plan/TranslationContext.scala     |  79 ++++++++++++
 .../plan/nodes/dataset/DataSetConvention.scala  |  42 +++++++
 .../plan/nodes/dataset/DataSetExchange.scala    |  63 ++++++++++
 .../plan/nodes/dataset/DataSetFlatMap.scala     |  62 ++++++++++
 .../plan/nodes/dataset/DataSetGroupReduce.scala |  63 ++++++++++
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  73 +++++++++++
 .../table/plan/nodes/dataset/DataSetMap.scala   |  63 ++++++++++
 .../plan/nodes/dataset/DataSetReduce.scala      |  63 ++++++++++
 .../table/plan/nodes/dataset/DataSetRel.scala   |  33 +++++
 .../table/plan/nodes/dataset/DataSetSort.scala  |  62 ++++++++++
 .../plan/nodes/dataset/DataSetSource.scala      |  55 +++++++++
 .../table/plan/nodes/dataset/DataSetUnion.scala |  62 ++++++++++
 .../plan/nodes/logical/FlinkAggregate.scala     |  76 ++++++++++++
 .../table/plan/nodes/logical/FlinkCalc.scala    |  37 ++++++
 .../plan/nodes/logical/FlinkConvention.scala    |  42 +++++++
 .../table/plan/nodes/logical/FlinkFilter.scala  |  42 +++++++
 .../table/plan/nodes/logical/FlinkJoin.scala    |  46 +++++++
 .../table/plan/nodes/logical/FlinkProject.scala |  45 +++++++
 .../api/table/plan/nodes/logical/FlinkRel.scala |  25 ++++
 .../table/plan/nodes/logical/FlinkScan.scala    |  31 +++++
 .../table/plan/nodes/logical/FlinkUnion.scala   |  38 ++++++
 .../api/table/plan/operators/DataSetTable.scala |  66 ----------
 .../api/table/plan/rules/FlinkRuleSets.scala    | 120 +++++++++++++++++++
 .../rules/dataset/DataSetAggregateRule.scala    |  53 ++++++++
 .../plan/rules/dataset/DataSetCalcRule.scala    |  52 ++++++++
 .../plan/rules/dataset/DataSetFilterRule.scala  |  52 ++++++++
 .../plan/rules/dataset/DataSetJoinRule.scala    |  59 +++++++++
 .../plan/rules/dataset/DataSetProjectRule.scala |  52 ++++++++
 .../plan/rules/dataset/DataSetScanRule.scala    |  53 ++++++++
 .../plan/rules/dataset/DataSetUnionRule.scala   |  53 ++++++++
 .../plan/rules/logical/FlinkAggregateRule.scala |  53 ++++++++
 .../plan/rules/logical/FlinkCalcRule.scala      |  50 ++++++++
 .../plan/rules/logical/FlinkFilterRule.scala    |  50 ++++++++
 .../plan/rules/logical/FlinkJoinRule.scala      |  54 +++++++++
 .../plan/rules/logical/FlinkProjectRule.scala   |  51 ++++++++
 .../plan/rules/logical/FlinkScanRule.scala      |  53 ++++++++
 .../plan/rules/logical/FlinkUnionRule.scala     |  54 +++++++++
 .../api/table/plan/schema/DataSetTable.scala    |  89 ++++++++++++++
 .../org/apache/flink/api/table/table.scala      |   1 -
 .../api/java/table/test/AggregationsITCase.java |  72 +++++------
 .../flink/api/java/table/test/AsITCase.java     |  62 +++++-----
 .../api/java/table/test/CastingITCase.java      |  53 ++++----
 .../api/java/table/test/ExpressionsITCase.java  |  32 ++---
 .../flink/api/java/table/test/FilterITCase.java |  69 +++++------
 .../table/test/GroupedAggregationsITCase.java   |  40 +++----
 .../flink/api/java/table/test/JoinITCase.java   |  70 ++++++-----
 .../api/java/table/test/PojoGroupingITCase.java |  19 +--
 .../flink/api/java/table/test/SelectITCase.java |  68 +++++------
 .../table/test/StringExpressionsITCase.java     |  40 ++++---
 .../flink/api/java/table/test/UnionITCase.java  |  47 ++++----
 .../scala/table/test/AggregationsITCase.scala   |  59 +++++----
 .../flink/api/scala/table/test/AsITCase.scala   |  50 ++++----
 .../api/scala/table/test/CastingITCase.scala    |  44 +++----
 .../scala/table/test/ExpressionsITCase.scala    |  40 +++----
 .../table/test/GroupedAggregationsITCase.scala  |  33 +++--
 .../flink/api/scala/table/test/JoinITCase.scala |  54 ++++-----
 .../api/scala/table/test/SelectITCase.scala     |  72 +++++------
 .../table/test/StringExpressionsITCase.scala    |  28 ++---
 .../api/scala/table/test/UnionITCase.scala      |  36 +++---
 71 files changed, 2596 insertions(+), 1178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
deleted file mode 100644
index d7fbc8e..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <strong>Table API</strong><br>
- *
- * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
- * the language specific part of the API. Refer to these packages for 
documentation on how
- * the Table API can be used in Java and Scala.
- */
-package org.apache.flink.api.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
deleted file mode 100644
index df0ebc0..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.flink.api.java.DataSet;
-
-public interface DataSetRelNode<T> extends RelNode {
-       
-       /**
-        * Translate the FlinkRelNode into Flink operator.
-        */
-       DataSet<T> translateToPlan();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
deleted file mode 100644
index 1ddd884..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with PartitionOperator.
- *
- * @param <T>
- */
-public class DataSetExchange<T> extends SingleRel implements DataSetRelNode<T> 
{
-       
-       public DataSetExchange(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private PartitionMethod getPartitionMethod() {
-               return null;
-       }
-       
-       private int[] getPartitionKey() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java
deleted file mode 100644
index cf597b2..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetFlatMap.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with FlatMapOperator.
- *
- * @param <T>
- */
-public class DataSetFlatMap<T> extends SingleRel implements DataSetRelNode<T> {
-       
-       protected DataSetFlatMap(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private RichFlatMapFunction<T, T> getFlatMapFunction() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java
deleted file mode 100644
index 3d1146e..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetJoin.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.join.JoinType;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with JoinOperator and its related 
operations.
- */
-public class DataSetJoin<L, R, OUT> extends BiRel implements 
DataSetRelNode<OUT> {
-       
-       public DataSetJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode 
left, RelNode right) {
-               super(cluster, traitSet, left, right);
-       }
-       
-       private TypeInformation<L> getLeftInputType() {
-               return null;
-       }
-       
-       private TypeInformation<R> getRightInputType() {
-               return null;
-       }
-       
-       private TypeInformation<OUT> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private JoinType getJoinType() {
-               return null;
-       }
-       
-       private JoinHint getJoinHint() {
-               return null;
-       }
-       
-       private int[] getLeftJoinKey() {
-               return null;
-       }
-       
-       private int[] getRightJoinKey() {
-               return null;
-       }
-       
-       private JoinFunction<L, R, OUT> getJoinFunction() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<OUT> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java
deleted file mode 100644
index 3098cb3..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetMap.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with MapOperator.
- */
-public class DataSetMap<IN, OUT> extends SingleRel implements 
DataSetRelNode<OUT> {
-       
-       protected DataSetMap(RelOptCluster cluster, RelTraitSet traits, RelNode 
input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<IN> getInputType() {
-               return null;
-       }
-       
-       private TypeInformation<OUT> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private RichMapFunction<IN, OUT> getMapFunction() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<OUT> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java
deleted file mode 100644
index 4aa2846..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduce.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with ReduceOperator.
- */
-public class DataSetReduce<T> extends SingleRel implements DataSetRelNode<T> {
-       
-       public DataSetReduce(RelOptCluster cluster, RelTraitSet traits, RelNode 
input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private RichReduceFunction<T> getReduceFunction() {
-               return null;
-       }
-       
-       private int[] getGroupingKeys() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java
deleted file mode 100644
index 5467c56..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetReduceGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with ReduceGroupOperator.
- */
-public class DataSetReduceGroup<IN, OUT> extends SingleRel implements 
DataSetRelNode<OUT> {
-       
-       public DataSetReduceGroup(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<IN> getInputType() {
-               return null;
-       }
-       
-       private TypeInformation<OUT> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private RichGroupReduceFunction<IN, OUT> getGroupReduceFunction() {
-               return null;
-       }
-       
-       private int[] getGroupingKeys() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<OUT> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java
deleted file mode 100644
index 7fa1c53..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSort.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with SortPartitionOperator.
- *
- * @param <T>
- */
-public class DataSetSort<T> extends SingleRel implements DataSetRelNode<T> {
-       
-       public DataSetSort(RelOptCluster cluster, RelTraitSet traits, RelNode 
input) {
-               super(cluster, traits, input);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private int[] getSortKey() {
-               return null;
-       }
-       
-       private boolean[] getSortKeyOrder() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java
deleted file mode 100644
index f24ee79..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetSource.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with DataSource.
- *
- * @param <T>
- */
-public class DataSetSource<T> extends TableScan implements DataSetRelNode<T> {
-       
-       public DataSetSource(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
-               super(cluster, traitSet, table);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       private DataSet<T> getDatSource() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java
deleted file mode 100644
index 8b435e9..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetUnion.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sql.calcite.node;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.sql.calcite.DataSetRelNode;
-
-/**
- * Flink RelNode which matches along with UnionOperator.
- *
- * @param <T>
- */
-public class DataSetUnion<T> extends BiRel implements DataSetRelNode<T> {
-       
-       public DataSetUnion(RelOptCluster cluster, RelTraitSet traitSet, 
RelNode left, RelNode right) {
-               super(cluster, traitSet, left, right);
-       }
-       
-       private TypeInformation<T> getType() {
-               return null;
-       }
-       
-       private String getName() {
-               return null;
-       }
-       
-       @Override
-       public DataSet<T> translateToPlan() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index f3f4e9d..66bfbe7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -18,14 +18,17 @@
 
 package org.apache.flink.api.java.table
 
-import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.tools.{RelBuilder, Frameworks}
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.Programs
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.plan.operators.DataSetTable
 import org.apache.flink.api.table.Table
+import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel
+import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.DataSetTable
 
 /**
  * [[PlanTranslator]] for creating [[Table]]s from Java 
[[org.apache.flink.api.java.DataSet]]s and
@@ -41,37 +44,58 @@ class JavaBatchTranslator extends PlanTranslator {
 
     // create table representation from DataSet
     val dataSetTable = new DataSetTable[A](
-    repr.asInstanceOf[JavaDataSet[A]],
-    fieldNames
+      repr.asInstanceOf[JavaDataSet[A]],
+      fieldNames
     )
 
-    // register table in Cascading schema
-    val schema = Frameworks.createRootSchema(true)
-    val tableName = repr.hashCode().toString
-    schema.add(tableName, dataSetTable)
-
-    // initialize RelBuilder
-    val frameworkConfig = Frameworks
-      .newConfigBuilder
-      .defaultSchema(schema)
-      .build
-    val relBuilder = RelBuilder.create(frameworkConfig)
+    val tabName = TranslationContext.addDataSet(dataSetTable)
+    val relBuilder = TranslationContext.getRelBuilder
 
     // create table scan operator
-    relBuilder.scan(tableName)
+    relBuilder.scan(tabName)
     new Table(relBuilder.build(), relBuilder)
   }
 
   override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): 
JavaDataSet[A] = {
 
+    // get the planner for the plan
+    val planner = lPlan.getCluster.getPlanner
+
+    // we do not have any special requirements for the output
+    val outputProps = RelTraitSet.createEmpty()
+
+    println("-----------")
+    println("Input Plan:")
+    println("-----------")
     println(RelOptUtil.toString(lPlan))
 
-    // TODO: optimize & translate:
-    // - optimize RelNode plan
-    // - translate to Flink RelNode plan
-    // - generate DataSet program
+    // decorrelate
+    val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
+
+    // optimize the logical Flink plan
+    val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
+    val optPlan = optProgram.run(planner, decorPlan, outputProps)
+
+    println("---------------")
+    println("Optimized Plan:")
+    println("---------------")
+    println(RelOptUtil.toString(optPlan))
+
+    // optimize the logical Flink plan
+    val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
+    val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps)
+
+    println("-------------")
+    println("DataSet Plan:")
+    println("-------------")
+    println(RelOptUtil.toString(dataSetPlan))
+
+    dataSetPlan match {
+      case node: DataSetRel =>
+        node.translateToPlan.asInstanceOf[JavaDataSet[A]]
+      case _ => ???
+    }
 
-    null
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
new file mode 100644
index 0000000..b2b0c2b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.plan.ConventionTraitDef
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.tools.{Frameworks, RelBuilder}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+
+object TranslationContext {
+
+  private var relBuilder: RelBuilder = null
+  private var tables: SchemaPlus = null
+  private var tabNames: Map[AbstractTable, String] = null
+  private val nameCntr: AtomicInteger = new AtomicInteger(0)
+
+  reset()
+
+  def reset(): Unit = {
+
+    // register table in Cascading schema
+    tables = Frameworks.createRootSchema(true)
+
+    // initialize RelBuilder
+    val frameworkConfig = Frameworks
+      .newConfigBuilder
+      .defaultSchema(tables)
+      .traitDefs(ConventionTraitDef.INSTANCE)
+      .build
+
+    tabNames = Map[AbstractTable, String]()
+
+    relBuilder = RelBuilder.create(frameworkConfig)
+
+  }
+
+  def addDataSet(newTable: DataSetTable[_]): String = {
+
+    // look up name
+    val tabName = tabNames.get(newTable)
+
+    tabName match {
+      case Some(name) =>
+        name
+      case None =>
+        val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
+        tabNames += (newTable -> tabName)
+        tables.add(tabName, newTable)
+        tabName
+    }
+
+  }
+
+  def getRelBuilder: RelBuilder = {
+    relBuilder
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
new file mode 100644
index 0000000..cbacd16
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+
+class DataSetConvention extends Convention {
+
+  override def toString: String = getName
+
+  def getInterface: Class[_] = classOf[DataSetRel]
+
+  def getName: String = "DATASET"
+
+  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+  def register(planner: RelOptPlanner): Unit = { }
+
+}
+
+object DataSetConvention {
+
+  val INSTANCE = new DataSetConvention
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
new file mode 100644
index 0000000..ec5805a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with PartitionOperator.
+  */
+class DataSetExchange(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    partitionKey: Array[Int],
+    partitionMethod: PartitionMethod)
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetExchange(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      partitionKey,
+      partitionMethod
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
new file mode 100644
index 0000000..913cca0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelTraitSet, 
RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with FlatMapOperator.
+  *
+  */
+class DataSetFlatMap(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    func: FlatMapFunction[Row, Row])
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetFlatMap(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      func
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
new file mode 100644
index 0000000..11bb160
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with ReduceGroupOperator.
+  */
+class DataSetGroupReduce(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    groupingKeys: Array[Int],
+    func: GroupReduceFunction[Row, Row])
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetGroupReduce(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      groupingKeys,
+      func
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
new file mode 100644
index 0000000..c20cdc5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataSetJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    joinKeysLeft: Array[Int],
+    joinKeysRight: Array[Int],
+    joinType: JoinType,
+    joinHint: JoinHint,
+    func: JoinFunction[Row, Row, Row])
+  extends BiRel(cluster, traitSet, left, right)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowType,
+      opName,
+      joinKeysLeft,
+      joinKeysRight,
+      joinType,
+      joinHint,
+      func
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
new file mode 100644
index 0000000..be8bd9d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with MapOperator.
+  */
+class DataSetMap(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    func: MapFunction[Row, Row])
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetMap(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      func
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def toString() = opName
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
new file mode 100644
index 0000000..567a91c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with ReduceOperator.
+  */
+class DataSetReduce(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    groupingKeys: Array[Int],
+    func: ReduceFunction[Row])
+  extends SingleRel(cluster, traits, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetReduce(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      groupingKeys,
+      func
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
new file mode 100644
index 0000000..20677b3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+trait DataSetRel extends RelNode {
+
+  /**
+    * Translate the FlinkRelNode into Flink operator.
+    */
+  def translateToPlan: DataSet[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
new file mode 100644
index 0000000..df5301d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with SortPartitionOperator.
+  */
+class DataSetSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    opName: String,
+    sortKey: Array[Int],
+    sortOrder: Array[Boolean])
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetSort(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      opName,
+      sortKey,
+      sortOrder
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
new file mode 100644
index 0000000..effaf1a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelWriter, RelNode}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+  * Flink RelNode which matches along with DataSource.
+  */
+class DataSetSource(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    rowType: RelDataType,
+    inputDataSet: DataSet[_])
+  extends TableScan(cluster, traitSet, table)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetSource(
+      cluster,
+      traitSet,
+      table,
+      rowType,
+      inputDataSet
+    )
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
new file mode 100644
index 0000000..a510fc9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+
+/**
+* Flink RelNode which matches along with UnionOperator.
+*
+*/
+class DataSetUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    rowType: RelDataType,
+    opName: String)
+  extends BiRel(cluster, traitSet, left, right)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetUnion(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowType,
+      opName
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan: DataSet[Any] = {
+    ???
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
new file mode 100644
index 0000000..f66cb71
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{AggregateCall, Aggregate}
+import org.apache.calcite.sql.fun.SqlAvgAggFunction
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+class FlinkAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    indicator: Boolean,
+    groupSet: ImmutableBitSet,
+    groupSets: java.util.List[ImmutableBitSet],
+    aggCalls: java.util.List[AggregateCall])
+  extends Aggregate(cluster, traitSet, input, indicator, groupSet, groupSets, 
aggCalls)
+  with FlinkRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      indicator: Boolean,
+      groupSet: ImmutableBitSet,
+      groupSets: util.List[ImmutableBitSet],
+      aggCalls: util.List[AggregateCall]): Aggregate = {
+
+    new FlinkAggregate(
+      cluster,
+      traitSet,
+      input,
+      indicator,
+      groupSet,
+      groupSets,
+      aggCalls
+    )
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val origCosts = super.computeSelfCost(planner)
+    val deltaCost = planner.getCostFactory.makeHugeCost()
+
+    // only prefer aggregations with transformed Avg
+    aggCalls.toList.foldLeft[RelOptCost](origCosts){
+      (c: RelOptCost, a: AggregateCall) =>
+        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
+          c.plus(deltaCost)
+        } else {
+          c
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
new file mode 100644
index 0000000..bcfe8d7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+
+class FlinkCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    program: RexProgram)
+  extends Calc(cluster, traitSet, input, program)
+  with FlinkRel {
+
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
+    new FlinkCalc(cluster, traitSet, child, program)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
new file mode 100644
index 0000000..80137f2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.plan._
+
+class FlinkConvention extends Convention {
+
+  override def toString: String = getName
+
+  def getInterface: Class[_] = classOf[FlinkRel]
+
+  def getName: String = "FLINK"
+
+  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+  def register(planner: RelOptPlanner): Unit = { }
+
+}
+
+object FlinkConvention {
+
+  val INSTANCE = new FlinkConvention
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
new file mode 100644
index 0000000..9f0bf30
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rex.RexNode
+
+class FlinkFilter(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    condition: RexNode)
+  extends Filter(cluster, traitSet, input, condition)
+  with FlinkRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode, condition: 
RexNode): Filter = {
+    new FlinkFilter(
+      cluster,
+      traitSet,
+      input,
+      condition
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
new file mode 100644
index 0000000..8b04b50
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{JoinRelType, Join}
+import org.apache.calcite.rex.RexNode
+
+class FlinkJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType,
+    variablesStopped: java.util.Set[String])
+  extends Join(cluster, traitSet, left, right, condition, joinType, 
variablesStopped)
+  with FlinkRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      condition: RexNode,
+      left: RelNode,
+      right: RelNode,
+      joinType: JoinRelType,
+      semiJoinDone: Boolean): Join = {
+    new FlinkJoin(cluster, traitSet, left, right, condition, joinType, 
getVariablesStopped)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
new file mode 100644
index 0000000..1d93036
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rex.RexNode
+
+class FlinkProject(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    projects: java.util.List[RexNode],
+    rowType: RelDataType)
+  extends Project(cluster, traitSet, input, projects, rowType)
+  with FlinkRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      projects: util.List[RexNode],
+      rowType: RelDataType): Project = {
+    new FlinkProject(cluster, traitSet, input, projects, rowType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
new file mode 100644
index 0000000..9ebd7e4
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.rel.RelNode
+
+trait FlinkRel extends RelNode {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
new file mode 100644
index 0000000..6d53a75
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import org.apache.calcite.plan.{RelOptTable, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.core.TableScan
+
+class FlinkScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable)
+  extends TableScan(cluster, traitSet, table)
+  with FlinkRel {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecb7010/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
new file mode 100644
index 0000000..fd791d3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.logical
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{SetOp, Union}
+
+class FlinkUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputs: java.util.List[RelNode],
+    all: Boolean)
+  extends Union(cluster, traitSet, inputs, all)
+  with FlinkRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: 
Boolean): SetOp = {
+    new FlinkUnion(cluster, traitSet, inputs, all)
+  }
+}

Reply via email to