http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
deleted file mode 100644
index 65b97fb..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.operators
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.TypeConverter
-
-class DataSetTable[T](
-  val dataSet: DataSet[T],
-  val fieldNames: Array[String]) extends AbstractTable {
-
-  // check uniquenss of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-    throw new scala.IllegalArgumentException(
-      "Table field names must be unique.")
-  }
-
-  val dataSetType: CompositeType[T] =
-    dataSet.getType match {
-      case cType: CompositeType[T] =>
-        cType
-      case _ =>
-        throw new scala.IllegalArgumentException(
-          "DataSet must have a composite type.")
-    }
-
-  val fieldTypes: Array[SqlTypeName] =
-    if (fieldNames.length == dataSetType.getArity) {
-      (0 until dataSetType.getArity)
-        .map(i => dataSetType.getTypeAt(i))
-        .map(TypeConverter.typeInfoToSqlType)
-        .toArray
-    }
-    else {
-      throw new IllegalArgumentException(
-        "Arity of DataSet type not equal to number of field names.")
-    }
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val builder = typeFactory.builder
-    fieldNames.zip(fieldTypes)
-      .foreach( f => builder.add(f._1, f._2).nullable(true) )
-    builder.build
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
new file mode 100644
index 0000000..97e8b32
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.{RuleSets, RuleSet}
+import org.apache.flink.api.table.plan.rules.logical._
+import org.apache.flink.api.table.plan.rules.dataset._
+
+object FlinkRuleSets {
+
+  /**
+    * RuleSet to optimize plans for batch / DataSet exeuction
+    */
+  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
+
+    // filter rules
+    FilterJoinRule.FILTER_ON_JOIN,
+    FilterJoinRule.JOIN,
+    FilterMergeRule.INSTANCE,
+    FilterAggregateTransposeRule.INSTANCE,
+
+    // push and merge projection rules
+    AggregateProjectMergeRule.INSTANCE,
+    ProjectMergeRule.INSTANCE,
+    ProjectFilterTransposeRule.INSTANCE,
+    FilterProjectTransposeRule.INSTANCE,
+    AggregateProjectPullUpConstantsRule.INSTANCE,
+    JoinPushExpressionsRule.INSTANCE,
+    ProjectJoinTransposeRule.INSTANCE,
+    ProjectRemoveRule.INSTANCE,
+    SortProjectTransposeRule.INSTANCE,
+    ProjectSortTransposeRule.INSTANCE,
+
+    // merge and push unions rules
+    // TODO: Add a rule to enforce binary unions
+    UnionEliminatorRule.INSTANCE,
+    JoinUnionTransposeRule.LEFT_UNION,
+    JoinUnionTransposeRule.RIGHT_UNION,
+    // non-all Union to all-union + distinct
+    UnionToDistinctRule.INSTANCE,
+
+    // aggregation rules
+    AggregateRemoveRule.INSTANCE,
+    AggregateJoinTransposeRule.EXTENDED,
+    AggregateUnionAggregateRule.INSTANCE,
+    AggregateReduceFunctionsRule.INSTANCE,
+    AggregateExpandDistinctAggregatesRule.INSTANCE,
+
+    // remove unnecessary sort rule
+    SortRemoveRule.INSTANCE,
+
+    // join reordering rules
+    JoinPushThroughJoinRule.LEFT,
+    JoinPushThroughJoinRule.RIGHT,
+    JoinAssociateRule.INSTANCE,
+    JoinCommuteRule.INSTANCE,
+    JoinCommuteRule.SWAP_OUTER,
+
+    // simplify expressions rules
+    ReduceExpressionsRule.CALC_INSTANCE,
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.JOIN_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
+
+    // prune empty results rules
+    PruneEmptyRules.AGGREGATE_INSTANCE,
+    PruneEmptyRules.FILTER_INSTANCE,
+    PruneEmptyRules.JOIN_LEFT_INSTANCE,
+    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    PruneEmptyRules.PROJECT_INSTANCE,
+    PruneEmptyRules.SORT_INSTANCE,
+    PruneEmptyRules.UNION_INSTANCE,
+
+    // calc rules
+    FilterCalcMergeRule.INSTANCE,
+    ProjectCalcMergeRule.INSTANCE,
+    FilterToCalcRule.INSTANCE,
+    ProjectToCalcRule.INSTANCE,
+    CalcMergeRule.INSTANCE,
+
+    // translate to logical Flink nodes
+    FlinkAggregateRule.INSTANCE,
+    FlinkCalcRule.INSTANCE,
+    FlinkFilterRule.INSTANCE,
+    FlinkJoinRule.INSTANCE,
+    FlinkProjectRule.INSTANCE,
+    FlinkScanRule.INSTANCE,
+    FlinkUnionRule.INSTANCE
+  )
+
+  val DATASET_TRANS_RULES: RuleSet = RuleSets.ofList(
+  
+    // translate to DataSet nodes
+    DataSetAggregateRule.INSTANCE,
+    DataSetCalcRule.INSTANCE,
+    DataSetFilterRule.INSTANCE,
+    DataSetJoinRule.INSTANCE,
+    DataSetProjectRule.INSTANCE,
+    DataSetScanRule.INSTANCE,
+    DataSetUnionRule.INSTANCE
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
new file mode 100644
index 0000000..1d17d63
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetReduce}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, 
FlinkConvention}
+
+class DataSetAggregateRule
+  extends ConverterRule(
+    classOf[FlinkAggregate],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetAggregateRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val agg: FlinkAggregate = rel.asInstanceOf[FlinkAggregate]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataSetConvention.INSTANCE)
+
+    new DataSetReduce(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      agg.toString,
+      Array[Int](),
+      null)
+  }
+}
+
+object DataSetAggregateRule {
+  val INSTANCE: RelOptRule = new DataSetAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
new file mode 100644
index 0000000..85e090c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetFlatMap}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, 
FlinkConvention}
+
+class DataSetCalcRule
+  extends ConverterRule(
+    classOf[FlinkCalc],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetCalcRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val calc: FlinkCalc = rel.asInstanceOf[FlinkCalc]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(calc.getInput, 
DataSetConvention.INSTANCE)
+
+    new DataSetFlatMap(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      calc.toString,
+      null)
+  }
+}
+
+object DataSetCalcRule {
+  val INSTANCE: RelOptRule = new DataSetCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
new file mode 100644
index 0000000..383c965
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetFlatMap}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, 
FlinkConvention}
+
+class DataSetFilterRule
+  extends ConverterRule(
+    classOf[FlinkFilter],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetFilterRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val filter: FlinkFilter = rel.asInstanceOf[FlinkFilter]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(filter.getInput, 
DataSetConvention.INSTANCE)
+
+    new DataSetFlatMap(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      filter.toString,
+      null)
+  }
+}
+
+object DataSetFilterRule {
+  val INSTANCE: RelOptRule = new DataSetFilterRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
new file mode 100644
index 0000000..3d2117d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetJoin}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
+
+class DataSetJoinRule
+  extends ConverterRule(
+    classOf[FlinkJoin],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetJoinRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val join: FlinkJoin = rel.asInstanceOf[FlinkJoin]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
+
+    new DataSetJoin(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      join.toString,
+      Array[Int](),
+      Array[Int](),
+      JoinType.INNER,
+      null,
+      null)
+  }
+}
+
+object DataSetJoinRule {
+  val INSTANCE: RelOptRule = new DataSetJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
new file mode 100644
index 0000000..7796d66
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetMap}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, 
FlinkConvention}
+
+class DataSetProjectRule
+  extends ConverterRule(
+    classOf[FlinkProject],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetProjectRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val proj: FlinkProject = rel.asInstanceOf[FlinkProject]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(proj.getInput, 
DataSetConvention.INSTANCE)
+
+    new DataSetMap(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      proj.toString,
+      null)
+  }
+}
+
+object DataSetProjectRule {
+  val INSTANCE: RelOptRule = new DataSetProjectRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
new file mode 100644
index 0000000..937f3e2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSource}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, 
FlinkConvention}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+
+class DataSetScanRule
+  extends ConverterRule(
+    classOf[FlinkScan],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetScanRule")
+{
+  def convert(rel: RelNode): RelNode = {
+    val scan: FlinkScan = rel.asInstanceOf[FlinkScan]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val dataSet: DataSet[_] = 
scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet
+
+    new DataSetSource(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      rel.getRowType,
+      dataSet
+    )
+  }
+}
+
+object DataSetScanRule {
+  val INSTANCE: RelOptRule = new DataSetScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
new file mode 100644
index 0000000..a390374
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataset
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetUnion}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, 
FlinkConvention}
+
+class DataSetUnionRule
+  extends ConverterRule(
+    classOf[FlinkUnion],
+    FlinkConvention.INSTANCE,
+    DataSetConvention.INSTANCE,
+    "DataSetUnionRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val union: FlinkUnion = rel.asInstanceOf[FlinkUnion]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataSetConvention.INSTANCE)
+
+    new DataSetUnion(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      union.toString)
+  }
+}
+
+object DataSetUnionRule {
+  val INSTANCE: RelOptRule = new DataSetUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
new file mode 100644
index 0000000..a5bfeb6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, 
FlinkConvention}
+
+class FlinkAggregateRule
+  extends ConverterRule(
+      classOf[LogicalAggregate],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkAggregateRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(agg.getInput, 
FlinkConvention.INSTANCE)
+
+      new FlinkAggregate(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        agg.indicator,
+        agg.getGroupSet,
+        agg.getGroupSets,
+        agg.getAggCallList)
+    }
+  }
+
+object FlinkAggregateRule {
+  val INSTANCE: RelOptRule = new FlinkAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
new file mode 100644
index 0000000..f40b04d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, 
FlinkConvention}
+
+class FlinkCalcRule
+  extends ConverterRule(
+      classOf[LogicalCalc],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkCalcRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(calc.getInput, 
FlinkConvention.INSTANCE)
+
+      new FlinkCalc(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        calc.getProgram)
+    }
+  }
+
+object FlinkCalcRule {
+  val INSTANCE: RelOptRule = new FlinkCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
new file mode 100644
index 0000000..25df8e8
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalFilter
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, 
FlinkConvention}
+
+class FlinkFilterRule
+  extends ConverterRule(
+      classOf[LogicalFilter],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkFilterRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val filter: LogicalFilter = rel.asInstanceOf[LogicalFilter]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(filter.getInput, 
FlinkConvention.INSTANCE)
+
+      new FlinkFilter(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        filter.getCondition)
+    }
+  }
+
+object FlinkFilterRule {
+  val INSTANCE: RelOptRule = new FlinkFilterRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
new file mode 100644
index 0000000..3826c9a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
+
+class FlinkJoinRule
+  extends ConverterRule(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkJoinRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConvention.INSTANCE)
+      val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
FlinkConvention.INSTANCE)
+
+      new FlinkJoin(
+        rel.getCluster,
+        traitSet,
+        convLeft,
+        convRight,
+        join.getCondition,
+        join.getJoinType,
+        join.getVariablesStopped)
+    }
+  }
+
+object FlinkJoinRule {
+  val INSTANCE: RelOptRule = new FlinkJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
new file mode 100644
index 0000000..d0e1410
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, 
FlinkConvention}
+
+class FlinkProjectRule
+  extends ConverterRule(
+      classOf[LogicalProject],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkProjectRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val proj: LogicalProject = rel.asInstanceOf[LogicalProject]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(proj.getInput, 
FlinkConvention.INSTANCE)
+
+      new FlinkProject(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        proj.getProjects,
+        proj.getRowType)
+    }
+  }
+
+object FlinkProjectRule {
+  val INSTANCE: RelOptRule = new FlinkProjectRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
new file mode 100644
index 0000000..d789770
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSource}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, 
FlinkConvention}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+
+class FlinkScanRule
+  extends ConverterRule(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkScanRule")
+  {
+    def convert(rel: RelNode): RelNode = {
+      val scan: TableScan = rel.asInstanceOf[TableScan]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val dataSet: DataSet[_] = 
scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet
+
+      new FlinkScan(
+        rel.getCluster,
+        traitSet,
+        scan.getTable
+      )
+    }
+  }
+
+object FlinkScanRule {
+  val INSTANCE: RelOptRule = new FlinkScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
new file mode 100644
index 0000000..d9869f8
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.logical
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, 
FlinkConvention}
+
+import scala.collection.JavaConversions._
+
+class FlinkUnionRule
+  extends ConverterRule(
+      classOf[LogicalUnion],
+      Convention.NONE,
+      FlinkConvention.INSTANCE,
+      "FlinkUnionRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
+      val convInputs = union.getInputs.toList.map(
+        RelOptRule.convert(_, FlinkConvention.INSTANCE)
+      )
+
+      new FlinkUnion(
+        rel.getCluster,
+        traitSet,
+        convInputs,
+        union.all)
+    }
+  }
+
+object FlinkUnionRule {
+  val INSTANCE: RelOptRule = new FlinkUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
new file mode 100644
index 0000000..e6aecab
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.rel.{RelCollation, RelDistribution}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.plan.TypeConverter
+
+class DataSetTable[T](
+    val dataSet: DataSet[T],
+    val fieldNames: Array[String])
+  extends AbstractTable {
+
+  // check uniquenss of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+    throw new scala.IllegalArgumentException(
+      "Table field names must be unique.")
+  }
+
+  val dataSetType: CompositeType[T] =
+    dataSet.getType match {
+      case cType: CompositeType[T] =>
+        cType
+      case _ =>
+        throw new scala.IllegalArgumentException(
+          "DataSet must have a composite type.")
+    }
+
+  val fieldTypes: Array[SqlTypeName] =
+    if (fieldNames.length == dataSetType.getArity) {
+      (0 until dataSetType.getArity)
+        .map(i => dataSetType.getTypeAt(i))
+        .map(TypeConverter.typeInfoToSqlType)
+        .toArray
+    }
+    else {
+      throw new IllegalArgumentException(
+        "Arity of DataSet type not equal to number of field names.")
+    }
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val builder = typeFactory.builder
+    fieldNames.zip(fieldTypes)
+      .foreach( f => builder.add(f._1, f._2).nullable(true) )
+    builder.build
+  }
+
+//  override def getStatistic: Statistic = {
+//    new DefaultDataSetStatistic
+//  }
+
+}
+
+class DefaultDataSetStatistic extends Statistic {
+
+  override def getRowCount: Double = 1000d
+
+  override def getCollations: util.List[RelCollation] = Collections.emptyList()
+
+  override def isKey(columns: ImmutableBitSet): Boolean = false
+
+  override def getDistribution: RelDistribution = null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 271aa99..138cd70 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -270,7 +270,6 @@ class Table(
 
     relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
     val join = relBuilder.build()
-    val rowT = join.getRowType()
     new Table(join, relBuilder)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 447acad..a3d5edc 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -35,7 +35,9 @@ package org.apache.flink.api.java.table.test;
  * limitations under the License.
  */
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
@@ -48,6 +50,9 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
+
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class AggregationsITCase extends MultipleProgramsTestBase {
@@ -57,7 +62,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAggregationTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -66,10 +71,10 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
 
                Table result = table.select("f0.sum, f0.min, f0.max, f0.count, 
f0.avg");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "231,1,21,21,11";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "231,1,21,21,11";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -83,13 +88,13 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                Table result =
                                table.select("foo.avg");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testWorkingAggregationDataTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -105,13 +110,13 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                Table result =
                                table.select("f0.avg, f1.avg, f2.avg, f3.avg, 
f4.avg, f5.avg, f6.count");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1,1,1,1,1.5,1.5,2";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,1,1,1.5,1.5,2";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAggregationWithArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -128,13 +133,13 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                                table.select("(f0 + 2).avg + 2, f1.count + 5");
 
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "5.5,7";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "5.5,7";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAggregationWithTwoCount() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -150,11 +155,10 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                Table result =
                        table.select("f0.count, f1.count");
 
-
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2,2";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2";
+               compareResultAsText(results, expected);
        }
 
        // Calcite does not eagerly check type compatibility
@@ -172,11 +176,10 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                Table result =
                                table.select("f1.sum");
 
-
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = ExpressionException.class)
@@ -192,11 +195,10 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                Table result =
                                table.select("f0.sum.sum");
 
-
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index f706b48..f257c32 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.table.test;
 
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -29,6 +28,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -40,7 +40,7 @@ public class AsITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAs() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -48,15 +48,15 @@ public class AsITCase extends MultipleProgramsTestBase {
                Table table =
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-//                             "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-//                             "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//                             "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//                             "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//                             "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -67,10 +67,10 @@ public class AsITCase extends MultipleProgramsTestBase {
                Table table =
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -81,10 +81,10 @@ public class AsITCase extends MultipleProgramsTestBase {
                Table table =
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -95,10 +95,10 @@ public class AsITCase extends MultipleProgramsTestBase {
                Table table =
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -109,10 +109,10 @@ public class AsITCase extends MultipleProgramsTestBase {
                Table table =
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -124,10 +124,10 @@ public class AsITCase extends MultipleProgramsTestBase {
                                
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
                                                " c");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 5e99fd9..cde78ce 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -43,7 +44,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testNumericAutocastInArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -57,13 +58,13 @@ public class CastingITCase extends MultipleProgramsTestBase 
{
                Table result = table.select("f0 + 1, f1 +" +
                                " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2,2,2,2.0,2.0,2.0";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,2,2.0,2.0,2.0";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testNumericAutocastInComparison() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -79,13 +80,13 @@ public class CastingITCase extends MultipleProgramsTestBase 
{
                Table result = table
                                .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2,2,2,2,2.0,2.0,Hello";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,2,2,2.0,2.0,Hello";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testCastFromString() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -99,13 +100,13 @@ public class CastingITCase extends 
MultipleProgramsTestBase {
                Table result = table.select(
                                "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), 
f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1,1,1,1,2.0,2.0,true\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,1,1,2.0,2.0,true\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testCastDateFromString() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -120,14 +121,14 @@ public class CastingITCase extends 
MultipleProgramsTestBase {
                                .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS 
f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
                                .select("f0.cast(STRING), f1.cast(STRING), 
f2.cast(STRING), f3.cast(STRING)");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2011-05-03 00:00:00.000,1970-01-01 
15:51:36.000,2011-05-03 15:51:36.000," +
-//                             "1970-01-17 17:47:53.775\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2011-05-03 00:00:00.000,1970-01-01 
15:51:36.000,2011-05-03 15:51:36.000," +
+                               "1970-01-17 17:47:53.775\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testCastDateToStringAndLong() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -142,10 +143,10 @@ public class CastingITCase extends 
MultipleProgramsTestBase {
                        .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
                        .select("f0.cast(STRING), f0.cast(LONG), 
f1.cast(STRING), f1.cast(LONG)");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2011-05-03 
15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2011-05-03 
15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
+               compareResultAsText(results, expected);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 3bbc120..2a17087 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.table.test;
 
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -31,6 +30,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -42,7 +42,7 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -56,13 +56,13 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                Table result = table.select(
                                "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "0,10,2,10,1,-5";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "0,10,2,10,1,-5";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testLogic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -76,13 +76,13 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                Table result = table.select(
                                "b && true, b && false, b || false, !b");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "true,false,true,false";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "true,false,true,false";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testComparisons() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -96,10 +96,10 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                Table result = table.select(
                                "a > c, a >= b, a < c, a.isNull, a.isNotNull");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "true,true,false,false,true";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "true,true,false,false,true";
+               compareResultAsText(results, expected);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index 68925fb..a3ab10f 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -40,7 +41,7 @@ public class FilterITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAllRejectingFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -53,13 +54,13 @@ public class FilterITCase extends MultipleProgramsTestBase {
                Table result = table
                                .filter("false");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testAllPassingFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -72,18 +73,18 @@ public class FilterITCase extends MultipleProgramsTestBase {
                Table result = table
                                .filter("true");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-//                             "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-//                             "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//                             "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//                             "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//                             "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testFilterOnIntegerTupleField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -96,15 +97,15 @@ public class FilterITCase extends MultipleProgramsTestBase {
                Table result = table
                                .filter(" a % 2 = 0 ");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-//                             "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-//                             "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testNotEquals() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -117,15 +118,15 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                Table result = table
                                .filter("!( a % 2 <> 0 ) ");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-//                             "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-//                             "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testIntegerBiggerThan128() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -136,10 +137,10 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
 
                Table result = table.filter("a = 300 ");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "300,1,Hello\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "300,1,Hello\n";
+               compareResultAsText(results, expected);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index 2add694..d5dc56a 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.table.test;
 
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -54,13 +54,13 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                Table result = table
                                .groupBy("foo").select("a.avg");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testGroupedAggregate() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -73,13 +73,13 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                Table result = table
                                .groupBy("b").select("b, a.sum");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + 
"5,65\n" + "6,111\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + 
"5,65\n" + "6,111\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testGroupingKeyForwardIfNotUsed() throws Exception {
 
                // the grouping key needs to be forwarded to the intermediate 
DataSet, even
@@ -96,13 +96,13 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                Table result = table
                                .groupBy("b").select("a.sum");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + 
"111\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + 
"111\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testGroupNoAggregation() throws Exception {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -116,10 +116,10 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                Table result = table
                        .groupBy("b").select("a.sum as d, b").groupBy("b, 
d").select("b");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-//             List<Row> results = ds.collect();
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+               List<Row> results = ds.collect();
+               compareResultAsText(results, expected);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
index 2b44a87..2e1f4a7 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.table.test;
 
+import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -30,6 +31,9 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
+
+import java.util.List;
 
 
 @RunWith(Parameterized.class)
@@ -40,7 +44,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testJoin() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -53,13 +57,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
                Table result = in1.join(in2).where("b === e").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testJoinWithFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -72,13 +76,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
                Table result = in1.join(in2).where("b === e && b < 
2").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "Hi,Hallo\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testJoinWithMultipleKeys() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -91,11 +95,11 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
                Table result = in1.join(in2).where("a === d && b === 
h").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
-//                             "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
+                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -111,10 +115,10 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
                Table result = in1.join(in2).where("foo === e").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        // Calcite does not eagerly check the compatibility of compared types
@@ -133,10 +137,10 @@ public class JoinITCase extends MultipleProgramsTestBase {
                Table result = in1
                                .join(in2).where("a === g").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -153,13 +157,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
                Table result = in1
                                .join(in2).where("a === d").select("c, g");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testJoinWithAggregation() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -173,10 +177,10 @@ public class JoinITCase extends MultipleProgramsTestBase {
                Table result = in1
                                .join(in2).where("a === d").select("g.count");
 
-//             DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-//             List<Row> results = ds.collect();
-//             String expected = "6";
-//             compareResultAsText(results, expected);
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "6";
+               compareResultAsText(results, expected);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
index f19b8c1..5ef0235 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 @RunWith(Parameterized.class)
 public class PojoGroupingITCase extends MultipleProgramsTestBase {
@@ -38,7 +39,7 @@ public class PojoGroupingITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test
+       @Test(expected = NotImplementedError.class)
        public void testPojoGrouping() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -54,14 +55,14 @@ public class PojoGroupingITCase extends 
MultipleProgramsTestBase {
                        .select("groupMe, value, name")
                        .where("groupMe != 'B'");
 
-//             DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
-//
-//             DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-//                     .sortGroup("value", Order.DESCENDING)
-//                     .first(1);
-//
-//             List<MyPojo> resultList = result.collect();
-//             compareResultAsText(resultList, "A,24.0,Y");
+               DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
+
+               DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+                       .sortGroup("value", Order.DESCENDING)
+                       .first(1);
+
+               List<MyPojo> resultList = result.collect();
+               compareResultAsText(resultList, "A,24.0,Y");
        }
 
        public static class MyPojo implements Serializable {

Reply via email to