Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 297564646 -> 1f3eda984


[FLINK-3226] Add DataSet scan and  conversion to DataSet[Row]

This closes #1579.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f3eda98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f3eda98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f3eda98

Branch: refs/heads/tableOnCalcite
Commit: 1f3eda984473be87207c94ef8118c635a74b090a
Parents: 2975646
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Feb 2 17:15:28 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Feb 4 12:14:34 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |   2 +
 .../api/scala/table/ScalaBatchTranslator.scala  |   8 +-
 .../flink/api/table/plan/PlanTranslator.scala   |  87 ++++++++---
 .../flink/api/table/plan/TypeConverter.scala    |  15 ++
 .../plan/nodes/dataset/DataSetSource.scala      | 150 ++++++++++++++++++-
 .../plan/rules/dataset/DataSetScanRule.scala    |   8 +-
 .../api/table/plan/schema/DataSetTable.scala    |  41 ++---
 .../flink/api/java/table/test/AsITCase.java     |  63 ++++++--
 .../flink/api/java/table/test/FilterITCase.java |  15 +-
 .../flink/api/java/table/test/SelectITCase.java |  15 +-
 .../flink/api/scala/table/test/AsITCase.scala   |  40 ++++-
 .../api/scala/table/test/SelectITCase.scala     |  28 ++--
 12 files changed, 380 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 7e91190..f70f477 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -40,11 +40,13 @@ class JavaBatchTranslator extends PlanTranslator {
 
   override def createTable[A](
       repr: Representation[A],
+      fieldIndexes: Array[Int],
       fieldNames: Array[String]): Table = {
 
     // create table representation from DataSet
     val dataSetTable = new DataSetTable[A](
       repr.asInstanceOf[JavaDataSet[A]],
+      fieldIndexes,
       fieldNames
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
index 1c453fa..cc92c37 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
@@ -38,8 +38,12 @@ class ScalaBatchTranslator extends PlanTranslator {
 
   type Representation[A] = DataSet[A]
 
-  override def createTable[A](repr: Representation[A], fieldNames: 
Array[String]): Table = {
-    javaTranslator.createTable(repr.javaSet, fieldNames)
+  override def createTable[A](
+    repr: Representation[A],
+    fieldIndexes: Array[Int],
+    fieldNames: Array[String]): Table =
+  {
+    javaTranslator.createTable(repr.javaSet, fieldIndexes, fieldNames)
   }
 
   override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 4e97f83..af22768 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -18,10 +18,12 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.expressions.{Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.expressions.{Naming, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.api.table.Table
 
 import scala.language.reflectiveCalls
@@ -42,7 +44,10 @@ abstract class PlanTranslator {
   /**
    * Creates a [[Table]] from a DataSet (the underlying representation).
    */
-  def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table
+  def createTable[A](
+    repr: Representation[A],
+    fieldIndexes: Array[Int],
+    fieldNames: Array[String]): Table
 
   /**
    * Creates a [[Table]] from the given DataSet.
@@ -50,10 +55,15 @@ abstract class PlanTranslator {
   def createTable[A](repr: Representation[A]): Table = {
 
     val fieldNames: Array[String] = repr.getType() match {
-      case c: CompositeType[A] => c.getFieldNames
-      case tpe => Array() // createTable will throw an exception for this later
+      case t: TupleTypeInfo[A] => t.getFieldNames
+      case c: CaseClassTypeInfo[A] => c.getFieldNames
+      case p: PojoTypeInfo[A] => p.getFieldNames
+      case tpe =>
+        throw new IllegalArgumentException(
+          s"Type $tpe requires explicit field naming with AS.")
     }
-    createTable(repr, fieldNames)
+    val fieldIndexes = fieldNames.indices.toArray
+    createTable(repr, fieldIndexes, fieldNames)
   }
 
   /**
@@ -75,17 +85,60 @@ abstract class PlanTranslator {
     */
   def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table 
= {
 
-    val fieldNames: Array[String] = exprs
-      .map {
-        case ResolvedFieldReference(name, _) =>
-          name
-        case UnresolvedFieldReference(name) =>
-          name
-        case _ =>
-          throw new IllegalArgumentException("Only field expressions allowed")
-      }
-
-    createTable(repr, fieldNames)
+    val inputType = repr.getType()
+
+    val indexedNames: Array[(Int, String)] = inputType match {
+      case a: AtomicType[A] =>
+        if (exprs.length != 1) {
+          throw new IllegalArgumentException("Atomic type may can only have a 
single field.")
+        }
+        exprs.map {
+          case UnresolvedFieldReference(name) => (0, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression expected.")
+        }
+      case t: TupleTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+            val idx = t.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of 
type $t")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression or naming expression expected.")
+        }
+      case c: CaseClassTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+            val idx = c.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of 
type $c")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression or naming expression expected.")
+        }
+      case p: PojoTypeInfo[A] =>
+        exprs.map {
+          case Naming(UnresolvedFieldReference(origName), name) =>
+            val idx = p.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of 
type $p")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field naming expression expected.")
+        }
+      case tpe => throw new IllegalArgumentException(
+        s"Type $tpe cannot be converted into Table.")
+    }
+
+    val (fieldIndexes, fieldNames) = indexedNames.unzip
+
+    createTable(repr, fieldIndexes.toArray, fieldNames.toArray)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index 30a0589..f6fe2e4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -22,19 +22,34 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, 
GenericTypeInfo}
+import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 
 object TypeConverter {
 
   def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo 
match {
     case BOOLEAN_TYPE_INFO => BOOLEAN
+    case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN
     case BYTE_TYPE_INFO => TINYINT
+    case BYTE_VALUE_TYPE_INFO => TINYINT
     case SHORT_TYPE_INFO => SMALLINT
+    case SHORT_VALUE_TYPE_INFO => SMALLINT
     case INT_TYPE_INFO => INTEGER
+    case INT_VALUE_TYPE_INFO => INTEGER
     case LONG_TYPE_INFO => BIGINT
+    case LONG_VALUE_TYPE_INFO => BIGINT
     case FLOAT_TYPE_INFO => FLOAT
+    case FLOAT_VALUE_TYPE_INFO => FLOAT
     case DOUBLE_TYPE_INFO => DOUBLE
+    case DOUBLE_VALUE_TYPE_INFO => DOUBLE
     case STRING_TYPE_INFO => VARCHAR
+    case STRING_VALUE_TYPE_INFO => VARCHAR
     case DATE_TYPE_INFO => DATE
+//    case t: TupleTypeInfo[_] => ROW
+//    case c: CaseClassTypeInfo[_] => ROW
+//    case p: PojoTypeInfo[_] => STRUCTURED
+//    case g: GenericTypeInfo[_] => OTHER
     case _ => ??? // TODO more types
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index effaf1a..53067dc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -18,12 +18,25 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
+import java.lang.reflect.Field
+
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.{RelWriter, RelNode}
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TypeConverter
+import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.JavaConverters._
 
 /**
   * Flink RelNode which matches along with DataSource.
@@ -32,11 +45,12 @@ class DataSetSource(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType,
-    inputDataSet: DataSet[_])
+    rowType: RelDataType)
   extends TableScan(cluster, traitSet, table)
   with DataSetRel {
 
+  val dataSetTable: DataSetTable[Any] = 
table.unwrap(classOf[DataSetTable[Any]])
+
   override def deriveRowType() = rowType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
@@ -44,12 +58,136 @@ class DataSetSource(
       cluster,
       traitSet,
       table,
-      rowType,
-      inputDataSet
+      rowType
     )
   }
 
   override def translateToPlan: DataSet[Any] = {
-    ???
+
+    val inputDataSet: DataSet[Any] = dataSetTable.dataSet
+
+    // extract Flink data types
+    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+      .map(f => f.getType.getSqlTypeName)
+      .map(n => TypeConverter.sqlTypeToTypeInfo(n))
+      .toArray
+
+    val rowTypeInfo = new RowTypeInfo(fieldTypes, dataSetTable.fieldNames)
+
+    // convert input data set into row data set
+    inputDataSet.getType match {
+      case t: TupleTypeInfo[_] =>
+        val rowMapper = new TupleToRowMapper(dataSetTable.fieldIndexes)
+        inputDataSet.asInstanceOf[DataSet[Tuple]]
+          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
+
+      case c: CaseClassTypeInfo[_] =>
+        val rowMapper = new CaseClassToRowMapper(dataSetTable.fieldIndexes)
+        inputDataSet.asInstanceOf[DataSet[Product]]
+          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
+
+      case p: PojoTypeInfo[_] =>
+        // get pojo class
+        val typeClazz = p.getTypeClass.asInstanceOf[Class[Any]]
+        // get original field names
+        val origFieldNames = dataSetTable.fieldIndexes.map(i => 
p.getFieldNames()(i))
+
+        val rowMapper = new PojoToRowMapper(typeClazz, origFieldNames)
+        inputDataSet.asInstanceOf[DataSet[Any]]
+          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
+
+      case a: AtomicType[_] =>
+        val rowMapper = new AtomicToRowMapper
+        inputDataSet.asInstanceOf[DataSet[Any]]
+          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
+    }
+  }
+
+}
+
+class TupleToRowMapper(val fromIndexes: Array[Int])
+  extends RichMapFunction[Tuple, Row]
+{
+
+  @transient var outR: Row = null
+
+  override def open(conf: Configuration): Unit = {
+    outR = new Row(fromIndexes.length)
+  }
+
+  override def map(v: Tuple): Row = {
+
+    var i = 0
+    while (i < fromIndexes.length) {
+      outR.setField(i, v.getField(fromIndexes(i)))
+      i += 1
+    }
+    outR
+  }
+}
+
+class CaseClassToRowMapper(val fromIndexes: Array[Int])
+  extends RichMapFunction[Product, Row]
+{
+
+  @transient var outR: Row = null
+
+  override def open(conf: Configuration): Unit = {
+    outR = new Row(fromIndexes.length)
+  }
+
+  override def map(v: Product): Row = {
+
+    var i = 0
+    while (i < fromIndexes.length) {
+      outR.setField(i, v.productElement(fromIndexes(i)))
+      i += 1
+    }
+    outR
+  }
+}
+
+class PojoToRowMapper(val inClazz: Class[Any], val fieldNames: Array[String])
+  extends RichMapFunction[Any, Row]
+{
+
+  @transient var outR: Row = null
+  @transient var fields: Array[Field] = null
+
+  override def open(conf: Configuration): Unit = {
+
+    fields = fieldNames.map { n =>
+      val f = inClazz.getField(n)
+      f.setAccessible(true)
+      f
+    }
+    outR = new Row(fieldNames.length)
+  }
+
+  override def map(v: Any): Row = {
+
+    var i = 0
+    while (i < fields.length) {
+      outR.setField(i, fields(i).get(v))
+      i += 1
+    }
+    outR
+  }
+}
+
+class AtomicToRowMapper()
+  extends RichMapFunction[Any, Row]
+{
+
+  @transient var outR: Row = null
+
+  override def open(conf: Configuration): Unit = {
+    outR = new Row(1)
+  }
+
+  override def map(v: Any): Row = {
+
+    outR.setField(0, v)
+    outR
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
index 937f3e2..f995201 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
@@ -21,10 +21,8 @@ package org.apache.flink.api.table.plan.rules.dataset
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSource}
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, 
FlinkConvention}
-import org.apache.flink.api.table.plan.schema.DataSetTable
 
 class DataSetScanRule
   extends ConverterRule(
@@ -33,17 +31,17 @@ class DataSetScanRule
     DataSetConvention.INSTANCE,
     "DataSetScanRule")
 {
+
   def convert(rel: RelNode): RelNode = {
     val scan: FlinkScan = rel.asInstanceOf[FlinkScan]
     val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val dataSet: DataSet[_] = 
scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet
+
 
     new DataSetSource(
       rel.getCluster,
       traitSet,
       scan.getTable,
-      rel.getRowType,
-      dataSet
+      rel.getRowType
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
index e6aecab..75090a2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
@@ -28,40 +28,45 @@ import org.apache.calcite.schema.Statistic
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.api.common.typeinfo.AtomicType
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.plan.TypeConverter
 
 class DataSetTable[T](
     val dataSet: DataSet[T],
+    val fieldIndexes: Array[Int],
     val fieldNames: Array[String])
   extends AbstractTable {
 
+  if (fieldIndexes.length != fieldNames.length) {
+    throw new IllegalArgumentException(
+      "Number of field indexes and field names must be equal.")
+  }
+
   // check uniquenss of field names
   if (fieldNames.length != fieldNames.toSet.size) {
-    throw new scala.IllegalArgumentException(
+    throw new IllegalArgumentException(
       "Table field names must be unique.")
   }
 
-  val dataSetType: CompositeType[T] =
+  val fieldTypes: Array[SqlTypeName] =
     dataSet.getType match {
       case cType: CompositeType[T] =>
-        cType
-      case _ =>
-        throw new scala.IllegalArgumentException(
-          "DataSet must have a composite type.")
-    }
-
-  val fieldTypes: Array[SqlTypeName] =
-    if (fieldNames.length == dataSetType.getArity) {
-      (0 until dataSetType.getArity)
-        .map(i => dataSetType.getTypeAt(i))
-        .map(TypeConverter.typeInfoToSqlType)
-        .toArray
-    }
-    else {
-      throw new IllegalArgumentException(
-        "Arity of DataSet type not equal to number of field names.")
+        if (fieldNames.length != cType.getArity) {
+          throw new IllegalArgumentException(
+          s"Arity of DataSet type (" + cType.getFieldNames.deep + ") " +
+            "not equal to number of field names " + fieldNames.deep + ".")
+        }
+        fieldIndexes
+          .map(cType.getTypeAt(_))
+          .map(TypeConverter.typeInfoToSqlType(_))
+      case aType: AtomicType[T] =>
+        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+          throw new IllegalArgumentException(
+            "Non-composite input type may have only a single field and its 
index must be 0.")
+        }
+        Array(TypeConverter.typeInfoToSqlType(aType))
     }
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index f257c32..0f35d75 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -28,8 +28,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
+import java.util.ArrayList;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -40,8 +40,8 @@ public class AsITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = NotImplementedError.class)
-       public void testAs() throws Exception {
+       @Test
+       public void testAsFromTuple() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
@@ -50,12 +50,39 @@ public class AsITCase extends MultipleProgramsTestBase {
 
                DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
                List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
+                       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" 
+ "6,3,Luke Skywalker\n" +
+                       "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
+                       "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
+                       "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
+                       "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
+                       "20,6,Comment#14\n" + "21,6,Comment#15\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testAsFromPojo() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               List<SmallPojo> data = new ArrayList<>();
+               data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+               data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+               data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+               Table table =
+                       tableEnv.fromDataSet(env.fromCollection(data),
+                               "department AS a, " +
+                               "age AS b, " +
+                               "salary AS c, " +
+                               "name AS d");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected =
+                       "Sales,28,4000.0,Peter\n" +
+                       "Engineering,56,10000.0,Anna\n" +
+                       "HR,42,6000.0,Lucy\n";
                compareResultAsText(results, expected);
        }
 
@@ -129,5 +156,23 @@ public class AsITCase extends MultipleProgramsTestBase {
                String expected = "";
                compareResultAsText(results, expected);
        }
+
+       public static class SmallPojo {
+
+               public SmallPojo() { }
+
+               public SmallPojo(String name, int age, double salary, String 
department) {
+                       this.name = name;
+                       this.age = age;
+                       this.salary = salary;
+                       this.department = department;
+               }
+
+               public String name;
+               public int age;
+               public double salary;
+               public String department;
+       }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index a3ab10f..cd08879 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -60,7 +60,7 @@ public class FilterITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testAllPassingFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -75,12 +75,13 @@ public class FilterITCase extends MultipleProgramsTestBase {
 
                DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
                List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
+                       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" 
+ "6,3,Luke Skywalker\n" +
+                       "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
+                       "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
+                       "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
+                       "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
+                       "20,6,Comment#14\n" + "21,6,Comment#15\n";
                compareResultAsText(results, expected);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index a3d31da..a66219c 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -41,7 +41,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testSimpleSelectAllWithAs() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -55,12 +55,13 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
                DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
                List<Row> results = resultSet.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" +
+                       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" 
+ "6,3,Luke Skywalker\n" +
+                       "7,4,Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" +
+                       "11,5,Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" +
+                       "14,5,Comment#8\n" + "15,5,Comment#9\n" + 
"16,6,Comment#10\n" +
+                       "17,6,Comment#11\n" + "18,6,Comment#12\n" + 
"19,6,Comment#13\n" +
+                       "20,6,Comment#14\n" + "21,6,Comment#15\n";
                compareResultAsText(results, expected);
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 5ff2b82..6779d4c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -33,18 +33,38 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) 
{
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke 
Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAsFromCaseClass(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data).as('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "Peter,28,4000.0,Sales\n" +
+      "Anna,56,10000.0,Engineering\n" +
+      "Lucy,42,6000.0,HR\n"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -105,4 +125,10 @@ class AsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: 
String) {
+
+  def this() { this("", 0, 0.0, "") }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f3eda98/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 4dadfe4..3700d67 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -33,34 +33,34 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testSimpleSelectAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, 
'_3)
 
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke 
Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testSimpleSelectAllWithAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 
'b, 'c)
 
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke 
Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }

Reply via email to