Repository: spark
Updated Branches:
  refs/heads/master 188ea348f -> 53e83a3a7


[SPARK-11116][SQL] First Draft of Dataset API

*This PR adds a new experimental API to Spark, tentitively named Datasets.*

A `Dataset` is a strongly-typed collection of objects that can be transformed 
in parallel using functional or relational operations.  Example usage is as 
follows:

### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```

### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```

## Comparison to RDDs
 A `Dataset` differs from an `RDD` in the following ways:
  - The creation of a `Dataset` requires the presence of an explicit `Encoder` 
that can be
    used to serialize the object into a binary format.  Encoders are also 
capable of mapping the
    schema of a given object to the Spark SQL type system.  In contrast, RDDs 
rely on runtime
    reflection based serialization.
  - Internally, a `Dataset` is represented by a Catalyst logical plan and the 
data is stored
    in the encoded form.  This representation allows for additional logical 
operations and
    enables many operations (sorting, shuffling, etc.) to be performed without 
deserializing to
    an object.

A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.

## Comparison to DataFrames

A `Dataset` can be thought of as a specialized DataFrame, where the elements 
map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be 
transformed into
specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform 
a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.

## Implementation Status and TODOs

This is a rough cut at the least controversial parts of the API.  The primary 
purpose here is to get something committed so that we can better parallelize 
further work and get early feedback on the API.  The following is being 
deferred to future PRs:
 - Joins and Aggregations (prototype here 
https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937)
 - Support for Java

Additionally, the responsibility for binding an encoder to a given schema is 
currently done in a fairly ad-hoc fashion.  This is an internal detail, and 
what we are doing today works for the cases we care about.  However, as we add 
more APIs we'll probably need to do this in a more principled way (i.e. 
separate resolution from binding as we do in DataFrames).

## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`.  However,
making this change to che class hierarchy would break the function signatures 
for the existing
function operations (map, flatMap, etc).  As such, this class should be 
considered a preview
of the final API.  Changes will be made to the interface after Spark 1.6.

Author: Michael Armbrust <mich...@databricks.com>

Closes #9190 from marmbrus/dataset-infra.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e83a3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e83a3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e83a3a

Branch: refs/heads/master
Commit: 53e83a3a77cafc2ccd0764ecdb8b3ba735bc51fc
Parents: 188ea34
Author: Michael Armbrust <mich...@databricks.com>
Authored: Thu Oct 22 15:20:17 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Oct 22 15:20:17 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    |   8 +-
 .../sql/catalyst/encoders/ClassEncoder.scala    |  38 +-
 .../spark/sql/catalyst/encoders/Encoder.scala   |  19 +-
 .../sql/catalyst/encoders/ProductEncoder.scala  |  12 +-
 .../sql/catalyst/encoders/primitiveTypes.scala  | 100 +++++
 .../spark/sql/catalyst/encoders/tuples.scala    | 173 ++++++++
 .../sql/catalyst/expressions/AttributeMap.scala |   7 +
 .../sql/catalyst/expressions/AttributeSet.scala |   4 +
 .../expressions/complexTypeCreator.scala        |   8 +
 .../sql/catalyst/expressions/package.scala      |  12 +
 .../catalyst/plans/logical/basicOperators.scala |  72 +++-
 .../encoders/PrimitiveEncoderSuite.scala        |  43 ++
 .../catalyst/encoders/ProductEncoderSuite.scala |  21 +-
 .../scala/org/apache/spark/sql/Column.scala     |  15 +
 .../scala/org/apache/spark/sql/DataFrame.scala  |  11 +
 .../scala/org/apache/spark/sql/Dataset.scala    | 392 +++++++++++++++++++
 .../org/apache/spark/sql/DatasetHolder.scala    |  30 ++
 .../org/apache/spark/sql/GroupedDataset.scala   |  68 ++++
 .../scala/org/apache/spark/sql/SQLContext.scala |  12 +
 .../org/apache/spark/sql/SQLImplicits.scala     |  16 +-
 .../spark/sql/execution/GroupedIterator.scala   | 141 +++++++
 .../spark/sql/execution/SparkStrategies.scala   |   8 +
 .../spark/sql/execution/basicOperators.scala    |  79 ++++
 .../spark/sql/DatasetPrimitiveSuite.scala       | 103 +++++
 .../org/apache/spark/sql/DatasetSuite.scala     | 124 ++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |   8 +
 26 files changed, 1501 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 27c96f4..713c6b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -411,9 +411,9 @@ trait ScalaReflection {
   }
 
   /** Returns expressions for extracting all the fields from the given type. */
-  def extractorsFor[T : TypeTag](inputObject: Expression): Seq[Expression] = {
+  def extractorsFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = 
{
     ScalaReflectionLock.synchronized {
-      extractorFor(inputObject, 
typeTag[T].tpe).asInstanceOf[CreateStruct].children
+      extractorFor(inputObject, typeTag[T].tpe).asInstanceOf[CreateNamedStruct]
     }
   }
 
@@ -497,11 +497,11 @@ trait ScalaReflection {
             }
           }
 
-          CreateStruct(params.head.map { p =>
+          CreateNamedStruct(params.head.flatMap { p =>
             val fieldName = p.name.toString
             val fieldType = p.typeSignature.substituteTypes(formalTypeArgs, 
actualTypeArgs)
             val fieldValue = Invoke(inputObject, fieldName, 
dataTypeFor(fieldType))
-            extractorFor(fieldValue, fieldType)
+            expressions.Literal(fieldName) :: extractorFor(fieldValue, 
fieldType) :: Nil
           })
 
         case t if t <:< localTypeOf[Array[_]] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
index 54096f1..b484b8f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
SimpleAnalyzer}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, 
GenerateUnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
@@ -41,9 +41,11 @@ case class ClassEncoder[T](
     clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  private val extractProjection = 
GenerateUnsafeProjection.generate(extractExpressions)
+  @transient
+  private lazy val extractProjection = 
GenerateUnsafeProjection.generate(extractExpressions)
   private val inputRow = new GenericMutableRow(1)
 
+  @transient
   private lazy val constructProjection = 
GenerateSafeProjection.generate(constructExpression :: Nil)
   private val dataType = ObjectType(clsTag.runtimeClass)
 
@@ -64,4 +66,36 @@ case class ClassEncoder[T](
 
     copy(constructExpression = boundExpression)
   }
+
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
ClassEncoder[T] = {
+    val positionToAttribute = AttributeMap.toIndex(oldSchema)
+    val attributeToNewPosition = AttributeMap.byIndex(newSchema)
+    copy(constructExpression = constructExpression transform {
+      case r: BoundReference =>
+        r.copy(ordinal = 
attributeToNewPosition(positionToAttribute(r.ordinal)))
+    })
+  }
+
+  override def bindOrdinals(schema: Seq[Attribute]): ClassEncoder[T] = {
+    var remaining = schema
+    copy(constructExpression = constructExpression transform {
+      case u: UnresolvedAttribute =>
+        val pos = remaining.head
+        remaining = remaining.drop(1)
+        pos
+    })
+  }
+
+  protected val attrs = extractExpressions.map(_.collect {
+    case a: Attribute => s"#${a.exprId}"
+    case b: BoundReference => s"[${b.ordinal}]"
+  }.headOption.getOrElse(""))
+
+
+  protected val schemaString =
+    schema
+      .zip(attrs)
+      .map { case(f, a) => s"${f.name}$a: 
${f.dataType.simpleString}"}.mkString(", ")
+
+  override def toString: String = s"class[$schemaString]"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
index bdb1c09..efb872d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType
  * and reuse internal buffers to improve performance.
  */
 trait Encoder[T] {
+
   /** Returns the schema of encoding this type of object as a Row. */
   def schema: StructType
 
@@ -46,13 +47,27 @@ trait Encoder[T] {
 
   /**
    * Returns an object of type `T`, extracting the required values from the 
provided row.  Note that
-   * you must bind the encoder to a specific schema before you can call this 
function.
+   * you must `bind` an encoder to a specific schema before you can call this 
function.
    */
   def fromRow(row: InternalRow): T
 
   /**
    * Returns a new copy of this encoder, where the expressions used by 
`fromRow` are bound to the
-   * given schema
+   * given schema.
    */
   def bind(schema: Seq[Attribute]): Encoder[T]
+
+  /**
+   * Binds this encoder to the given schema positionally.  In this binding, 
the first reference to
+   * any input is mapped to `schema(0)`, and so on for each input that is 
encountered.
+   */
+  def bindOrdinals(schema: Seq[Attribute]): Encoder[T]
+
+  /**
+   * Given an encoder that has already been bound to a given schema, returns a 
new encoder that
+   * where the positions are mapped from `oldSchema` to `newSchema`.  This can 
be used, for example,
+   * when you are trying to use an encoder on grouping keys that were 
orriginally part of a larger
+   * row, but now you have projected out only the key expressions.
+   */
+  def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): Encoder[T]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoder.scala
index 4f7ce45..34f5e6c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoder.scala
@@ -31,15 +31,17 @@ import org.apache.spark.sql.types.{ObjectType, StructType}
 object ProductEncoder {
   def apply[T <: Product : TypeTag]: ClassEncoder[T] = {
     // We convert the not-serializable TypeTag into StructType and ClassTag.
-    val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
     val mirror = typeTag[T].mirror
     val cls = mirror.runtimeClass(typeTag[T].tpe)
 
     val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-    val extractExpressions = ScalaReflection.extractorsFor[T](inputObject)
+    val extractExpression = ScalaReflection.extractorsFor[T](inputObject)
     val constructExpression = ScalaReflection.constructorFor[T]
-    new ClassEncoder[T](schema, extractExpressions, constructExpression, 
ClassTag[T](cls))
-  }
-
 
+    new ClassEncoder[T](
+      extractExpression.dataType,
+      extractExpression.flatten,
+      constructExpression,
+      ClassTag[T](cls))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/primitiveTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/primitiveTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/primitiveTypes.scala
new file mode 100644
index 0000000..a93f2d7
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/primitiveTypes.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types._
+
+/** An encoder for primitive Long types. */
+case class LongEncoder(fieldName: String = "value", ordinal: Int = 0) extends 
Encoder[Long] {
+  private val row = UnsafeRow.createFromByteArray(64, 1)
+
+  override def clsTag: ClassTag[Long] = ClassTag.Long
+  override def schema: StructType =
+    StructType(StructField(fieldName, LongType) :: Nil)
+
+  override def fromRow(row: InternalRow): Long = row.getLong(ordinal)
+
+  override def toRow(t: Long): InternalRow = {
+    row.setLong(ordinal, t)
+    row
+  }
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[Long] = this
+  override def bind(schema: Seq[Attribute]): Encoder[Long] = this
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[Long] = this
+}
+
+/** An encoder for primitive Integer types. */
+case class IntEncoder(fieldName: String = "value", ordinal: Int = 0) extends 
Encoder[Int] {
+  private val row = UnsafeRow.createFromByteArray(64, 1)
+
+  override def clsTag: ClassTag[Int] = ClassTag.Int
+  override def schema: StructType =
+    StructType(StructField(fieldName, IntegerType) :: Nil)
+
+  override def fromRow(row: InternalRow): Int = row.getInt(ordinal)
+
+  override def toRow(t: Int): InternalRow = {
+    row.setInt(ordinal, t)
+    row
+  }
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[Int] = this
+  override def bind(schema: Seq[Attribute]): Encoder[Int] = this
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[Int] = this
+}
+
+/** An encoder for String types. */
+case class StringEncoder(
+    fieldName: String = "value",
+    ordinal: Int = 0) extends Encoder[String] {
+
+  val record = new SpecificMutableRow(StringType :: Nil)
+
+  @transient
+  lazy val projection =
+    GenerateUnsafeProjection.generate(BoundReference(0, StringType, true) :: 
Nil)
+
+  override def schema: StructType =
+    StructType(
+      StructField("value", StringType, nullable = false) :: Nil)
+
+  override def clsTag: ClassTag[String] = scala.reflect.classTag[String]
+
+
+  override final def fromRow(row: InternalRow): String = {
+    row.getString(ordinal)
+  }
+
+  override final def toRow(value: String): InternalRow = {
+    val utf8String = UTF8String.fromString(value)
+    record(0) = utf8String
+    // TODO: this is a bit of a hack to produce UnsafeRows
+    projection(record)
+  }
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[String] = this
+  override def bind(schema: Seq[Attribute]): Encoder[String] = this
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[String] = this
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala
new file mode 100644
index 0000000..a48eeda
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.types.{StructField, StructType}
+
+// Most of this file is codegen.
+// scalastyle:off
+
+/**
+ * A set of composite encoders that take sub encoders and map each of their 
objects to a
+ * Scala tuple.  Note that currently the implementation is fairly limited and 
only supports going
+ * from an internal row to a tuple.
+ */
+object TupleEncoder {
+
+  /** Code generator for composite tuple encoders. */
+  def main(args: Array[String]): Unit = {
+    (2 to 5).foreach { i =>
+      val types = (1 to i).map(t => s"T$t").mkString(", ")
+      val tupleType = s"($types)"
+      val args = (1 to i).map(t => s"e$t: Encoder[T$t]").mkString(", ")
+      val fields = (1 to i).map(t => s"""StructField("_$t", 
e$t.schema)""").mkString(", ")
+      val fromRow = (1 to i).map(t => s"e$t.fromRow(row)").mkString(", ")
+
+      println(
+        s"""
+          |class Tuple${i}Encoder[$types]($args) extends Encoder[$tupleType] {
+          |  val schema = StructType(Array($fields))
+          |
+          |  def clsTag: ClassTag[$tupleType] = 
scala.reflect.classTag[$tupleType]
+          |
+          |  def fromRow(row: InternalRow): $tupleType = {
+          |    ($fromRow)
+          |  }
+          |
+          |  override def toRow(t: $tupleType): InternalRow =
+          |    throw new UnsupportedOperationException("Tuple Encoders only 
support fromRow.")
+          |
+          |  override def bind(schema: Seq[Attribute]): Encoder[$tupleType] = {
+          |    this
+          |  }
+          |
+          |  override def rebind(oldSchema: Seq[Attribute], newSchema: 
Seq[Attribute]): Encoder[$tupleType] =
+          |    throw new UnsupportedOperationException("Tuple Encoders only 
support bind.")
+          |
+          |
+          |  override def bindOrdinals(schema: Seq[Attribute]): 
Encoder[$tupleType] =
+          |    throw new UnsupportedOperationException("Tuple Encoders only 
support bind.")
+          |}
+        """.stripMargin)
+    }
+  }
+}
+
+class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends 
Encoder[(T1, T2)] {
+  val schema = StructType(Array(StructField("_1", e1.schema), 
StructField("_2", e2.schema)))
+
+  def clsTag: ClassTag[(T1, T2)] = scala.reflect.classTag[(T1, T2)]
+
+  def fromRow(row: InternalRow): (T1, T2) = {
+    (e1.fromRow(row), e2.fromRow(row))
+  }
+
+  override def toRow(t: (T1, T2)): InternalRow =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
fromRow.")
+
+  override def bind(schema: Seq[Attribute]): Encoder[(T1, T2)] = {
+    this
+  }
+
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[(T1, T2)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[(T1, T2)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+}
+
+
+class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: 
Encoder[T3]) extends Encoder[(T1, T2, T3)] {
+  val schema = StructType(Array(StructField("_1", e1.schema), 
StructField("_2", e2.schema), StructField("_3", e3.schema)))
+
+  def clsTag: ClassTag[(T1, T2, T3)] = scala.reflect.classTag[(T1, T2, T3)]
+
+  def fromRow(row: InternalRow): (T1, T2, T3) = {
+    (e1.fromRow(row), e2.fromRow(row), e3.fromRow(row))
+  }
+
+  override def toRow(t: (T1, T2, T3)): InternalRow =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
fromRow.")
+
+  override def bind(schema: Seq[Attribute]): Encoder[(T1, T2, T3)] = {
+    this
+  }
+
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[(T1, T2, T3)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[(T1, T2, T3)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+}
+
+
+class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: 
Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] {
+  val schema = StructType(Array(StructField("_1", e1.schema), 
StructField("_2", e2.schema), StructField("_3", e3.schema), StructField("_4", 
e4.schema)))
+
+  def clsTag: ClassTag[(T1, T2, T3, T4)] = scala.reflect.classTag[(T1, T2, T3, 
T4)]
+
+  def fromRow(row: InternalRow): (T1, T2, T3, T4) = {
+    (e1.fromRow(row), e2.fromRow(row), e3.fromRow(row), e4.fromRow(row))
+  }
+
+  override def toRow(t: (T1, T2, T3, T4)): InternalRow =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
fromRow.")
+
+  override def bind(schema: Seq[Attribute]): Encoder[(T1, T2, T3, T4)] = {
+    this
+  }
+
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[(T1, T2, T3, T4)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[(T1, T2, T3, T4)] 
=
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+}
+
+
+class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: 
Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, 
T5)] {
+  val schema = StructType(Array(StructField("_1", e1.schema), 
StructField("_2", e2.schema), StructField("_3", e3.schema), StructField("_4", 
e4.schema), StructField("_5", e5.schema)))
+
+  def clsTag: ClassTag[(T1, T2, T3, T4, T5)] = scala.reflect.classTag[(T1, T2, 
T3, T4, T5)]
+
+  def fromRow(row: InternalRow): (T1, T2, T3, T4, T5) = {
+    (e1.fromRow(row), e2.fromRow(row), e3.fromRow(row), e4.fromRow(row), 
e5.fromRow(row))
+  }
+
+  override def toRow(t: (T1, T2, T3, T4, T5)): InternalRow =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
fromRow.")
+
+  override def bind(schema: Seq[Attribute]): Encoder[(T1, T2, T3, T4, T5)] = {
+    this
+  }
+
+  override def rebind(oldSchema: Seq[Attribute], newSchema: Seq[Attribute]): 
Encoder[(T1, T2, T3, T4, T5)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+
+
+  override def bindOrdinals(schema: Seq[Attribute]): Encoder[(T1, T2, T3, T4, 
T5)] =
+    throw new UnsupportedOperationException("Tuple Encoders only support 
bind.")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index 96a11e3..ef3cc55 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -26,6 +26,13 @@ object AttributeMap {
   def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = {
     new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
+
+  /** Given a schema, constructs an [[AttributeMap]] from [[Attribute]] to 
ordinal */
+  def byIndex(schema: Seq[Attribute]): AttributeMap[Int] = 
apply(schema.zipWithIndex)
+
+  /** Given a schema, constructs a map from ordinal to Attribute. */
+  def toIndex(schema: Seq[Attribute]): Map[Int, Attribute] =
+    schema.zipWithIndex.map { case (a, i) => i -> a }.toMap
 }
 
 class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index 5345696..3831535 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -31,6 +31,10 @@ protected class AttributeEquals(val a: Attribute) {
 }
 
 object AttributeSet {
+  /** Returns an empty [[AttributeSet]]. */
+  val empty = apply(Iterable.empty)
+
+  /** Constructs a new [[AttributeSet]] that contains a single [[Attribute]]. 
*/
   def apply(a: Attribute): AttributeSet = new AttributeSet(Set(new 
AttributeEquals(a)))
 
   /** Constructs a new [[AttributeSet]] given a sequence of [[Expression 
Expressions]]. */

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index a5f02e2..059e45b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -125,6 +125,14 @@ case class CreateStruct(children: Seq[Expression]) extends 
Expression {
  */
 case class CreateNamedStruct(children: Seq[Expression]) extends Expression {
 
+  /**
+   * Returns Aliased [[Expressions]] that could be used to construct a 
flattened version of this
+   * StructType.
+   */
+  def flatten: Seq[NamedExpression] = valExprs.zip(names).map {
+    case (v, n) => Alias(v, n.toString)()
+  }
+
   private lazy val (nameExprs, valExprs) =
     children.grouped(2).map { case Seq(name, value) => (name, value) 
}.toList.unzip
 

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 30b7f8d..f1fa13d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
  * A set of classes that can be used to represent trees of relational 
expressions.  A key goal of
@@ -80,4 +81,15 @@ package object expressions  {
     /** Uses the given row to store the output of the projection. */
     def target(row: MutableRow): MutableProjection
   }
+
+
+  /**
+   * Helper functions for working with `Seq[Attribute]`.
+   */
+  implicit class AttributeSeq(attrs: Seq[Attribute]) {
+    /** Creates a StructType with a schema matching this `Seq[Attribute]`. */
+    def toStructType: StructType = {
+      StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable)))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index ae9482c..21a55a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.Utils
 import org.apache.spark.sql.catalyst.plans._
@@ -417,7 +418,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
 }
 
 /**
- * Return a new RDD that has exactly `numPartitions` partitions. Differs from
+ * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
  * [[RepartitionByExpression]] as this method is called directly by 
DataFrame's, because the user
  * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used 
when the consumer
  * of the output requires some specific ordering or distribution of the data.
@@ -443,3 +444,72 @@ case object OneRowRelation extends LeafNode {
   override def statistics: Statistics = Statistics(sizeInBytes = 1)
 }
 
+/**
+ * A relation produced by applying `func` to each partition of the `child`. 
tEncoder/uEncoder are
+ * used respectively to decode/encode from the JVM object representation 
expected by `func.`
+ */
+case class MapPartitions[T, U](
+    func: Iterator[T] => Iterator[U],
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    output: Seq[Attribute],
+    child: LogicalPlan) extends UnaryNode {
+  override def missingInput: AttributeSet = AttributeSet.empty
+}
+
+/** Factory for constructing new `AppendColumn` nodes. */
+object AppendColumn {
+  def apply[T : Encoder, U : Encoder](func: T => U, child: LogicalPlan): 
AppendColumn[T, U] = {
+    val attrs = implicitly[Encoder[U]].schema.toAttributes
+    new AppendColumn[T, U](func, implicitly[Encoder[T]], 
implicitly[Encoder[U]], attrs, child)
+  }
+}
+
+/**
+ * A relation produced by applying `func` to each partition of the `child`, 
concatenating the
+ * resulting columns at the end of the input row. tEncoder/uEncoder are used 
respectively to
+ * decode/encode from the JVM object representation expected by `func.`
+ */
+case class AppendColumn[T, U](
+    func: T => U,
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    newColumns: Seq[Attribute],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output ++ newColumns
+  override def missingInput: AttributeSet = super.missingInput -- newColumns
+}
+
+/** Factory for constructing new `MapGroups` nodes. */
+object MapGroups {
+  def apply[K : Encoder, T : Encoder, U : Encoder](
+      func: (K, Iterator[T]) => Iterator[U],
+      groupingAttributes: Seq[Attribute],
+      child: LogicalPlan): MapGroups[K, T, U] = {
+    new MapGroups(
+      func,
+      implicitly[Encoder[K]],
+      implicitly[Encoder[T]],
+      implicitly[Encoder[U]],
+      groupingAttributes,
+      implicitly[Encoder[U]].schema.toAttributes,
+      child)
+  }
+}
+
+/**
+ * Applies func to each unique group in `child`, based on the evaluation of 
`groupingAttributes`.
+ * Func is invoked with an object representation of the grouping key an 
iterator containing the
+ * object representation of all the rows with that key.
+ */
+case class MapGroups[K, T, U](
+    func: (K, Iterator[T]) => Iterator[U],
+    kEncoder: Encoder[K],
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    groupingAttributes: Seq[Attribute],
+    output: Seq[Attribute],
+    child: LogicalPlan) extends UnaryNode {
+  override def missingInput: AttributeSet = AttributeSet.empty
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/PrimitiveEncoderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/PrimitiveEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/PrimitiveEncoderSuite.scala
new file mode 100644
index 0000000..52f8383
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/PrimitiveEncoderSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.SparkFunSuite
+
+class PrimitiveEncoderSuite extends SparkFunSuite {
+  test("long encoder") {
+    val enc = new LongEncoder()
+    val row = enc.toRow(10)
+    assert(row.getLong(0) == 10)
+    assert(enc.fromRow(row) == 10)
+  }
+
+  test("int encoder") {
+    val enc = new IntEncoder()
+    val row = enc.toRow(10)
+    assert(row.getInt(0) == 10)
+    assert(enc.fromRow(row) == 10)
+  }
+
+  test("string encoder") {
+    val enc = new StringEncoder()
+    val row = enc.toRow("test")
+    assert(row.getString(0) == "test")
+    assert(enc.fromRow(row) == "test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoderSuite.scala
index 02e43dd..7735acb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ProductEncoderSuite.scala
@@ -248,12 +248,16 @@ class ProductEncoderSuite extends SparkFunSuite {
         val types =
           convertedBack.productIterator.filter(_ != 
null).map(_.getClass.getName).mkString(",")
 
-        val encodedData = 
convertedData.toSeq(encoder.schema).zip(encoder.schema).map {
-          case (a: ArrayData, StructField(_, at: ArrayType, _, _)) =>
-            a.toArray[Any](at.elementType).toSeq
-          case (other, _) =>
-            other
-        }.mkString("[", ",", "]")
+        val encodedData = try {
+          convertedData.toSeq(encoder.schema).zip(encoder.schema).map {
+            case (a: ArrayData, StructField(_, at: ArrayType, _, _)) =>
+              a.toArray[Any](at.elementType).toSeq
+            case (other, _) =>
+              other
+          }.mkString("[", ",", "]")
+        } catch {
+          case e: Throwable => s"Failed to toSeq: $e"
+        }
 
         fail(
           s"""Encoded/Decoded data does not match input data
@@ -272,8 +276,9 @@ class ProductEncoderSuite extends SparkFunSuite {
              |Construct Expressions:
              |${boundEncoder.constructExpression.treeString}
              |
-           """.stripMargin)
+         """.stripMargin)
+        }
       }
-    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 37d559c..de11a16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql
 
+
 import scala.language.implicitConversions
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.Logging
 import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.types._
@@ -36,6 +38,11 @@ private[sql] object Column {
   def unapply(col: Column): Option[Expression] = Some(col.expr)
 }
 
+/**
+ * A [[Column]] where an [[Encoder]] has been given for the expected return 
type.
+ * @since 1.6.0
+ */
+class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) 
extends Column(expr)
 
 /**
  * :: Experimental ::
@@ -70,6 +77,14 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
   override def hashCode: Int = this.expr.hashCode
 
   /**
+   * Provides a type hint about the expected return value of this column.  
This information can
+   * be used by operations such as `select` on a [[Dataset]] to automatically 
convert the
+   * results into the correct JVM types.
+   * @since 1.6.0
+   */
+  def as[T : Encoder]: TypedColumn[T] = new TypedColumn[T](expr)
+
+  /**
    * Extracts a value or values from a complex type.
    * The following types of extraction are supported:
    * - Given an Array, an integer ordinal can be used to retrieve a single 
value.

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 2f10aa9..bf25bcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
@@ -259,6 +260,16 @@ class DataFrame private[sql](
   def toDF(): DataFrame = this
 
   /**
+   * :: Experimental ::
+   * Converts this [[DataFrame]] to a strongly-typed [[Dataset]] containing 
objects of the
+   * specified type, `U`.
+   * @group basic
+   * @since 1.6.0
+   */
+  @Experimental
+  def as[U : Encoder]: Dataset[U] = new Dataset[U](sqlContext, queryExecution)
+
+  /**
    * Returns a new [[DataFrame]] with columns renamed. This can be quite 
convenient in conversion
    * from a RDD of tuples into a [[DataFrame]] with meaningful names. For 
example:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
new file mode 100644
index 0000000..96213c7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.encoders._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[Dataset]] is a strongly typed collection of objects that can be 
transformed in parallel
+ * using functional or relational operations.
+ *
+ * A [[Dataset]] differs from an [[RDD]] in the following ways:
+ *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and 
the data is stored
+ *    in the encoded form.  This representation allows for additional logical 
operations and
+ *    enables many operations (sorting, shuffling, etc.) to be performed 
without deserializing to
+ *    an object.
+ *  - The creation of a [[Dataset]] requires the presence of an explicit 
[[Encoder]] that can be
+ *    used to serialize the object into a binary format.  Encoders are also 
capable of mapping the
+ *    schema of a given object to the Spark SQL type system.  In contrast, 
RDDs rely on runtime
+ *    reflection based serialization. Operations that change the type of 
object stored in the
+ *    dataset also need an encoder for the new type.
+ *
+ * A [[Dataset]] can be thought of as a specialized DataFrame, where the 
elements map to a specific
+ * JVM object type, instead of to a generic [[Row]] container. A DataFrame can 
be transformed into
+ * specific Dataset by calling `df.as[ElementType]`.  Similarly you can 
transform a strongly-typed
+ * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
+ *
+ * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend 
`Dataset[Row]`.  However,
+ * making this change to the class hierarchy would break the function 
signatures for the existing
+ * functional operations (map, flatMap, etc).  As such, this class should be 
considered a preview
+ * of the final API.  Changes will be made to the interface after Spark 1.6.
+ *
+ * @since 1.6.0
+ */
+@Experimental
+class Dataset[T] private[sql](
+    @transient val sqlContext: SQLContext,
+    @transient val queryExecution: QueryExecution)(
+    implicit val encoder: Encoder[T]) extends Serializable {
+
+  private implicit def classTag = encoder.clsTag
+
+  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit 
encoder: Encoder[T]) =
+    this(sqlContext, new QueryExecution(sqlContext, plan))
+
+  /** Returns the schema of the encoded form of the objects in this 
[[Dataset]]. */
+  def schema: StructType = encoder.schema
+
+  /* ************* *
+   *  Conversions  *
+   * ************* */
+
+  /**
+   * Returns a new `Dataset` where each record has been mapped on to the 
specified type.
+   * TODO: should bind here...
+   * TODO: document binding rules
+   * @since 1.6.0
+   */
+  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, 
queryExecution)(implicitly[Encoder[U]])
+
+  /**
+   * Applies a logical alias to this [[Dataset]] that can be used to 
disambiguate columns that have
+   * the same name after two Datasets have been joined.
+   */
+  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
+
+  /**
+   * Converts this strongly typed collection of data to generic Dataframe.  In 
contrast to the
+   * strongly typed objects that Dataset operations work on, a Dataframe 
returns generic [[Row]]
+   * objects that allow fields to be accessed by ordinal or name.
+   */
+  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
+
+
+  /**
+   * Returns this Dataset.
+   * @since 1.6.0
+   */
+  def toDS(): Dataset[T] = this
+
+  /**
+   * Converts this Dataset to an RDD.
+   * @since 1.6.0
+   */
+  def rdd: RDD[T] = {
+    val tEnc = implicitly[Encoder[T]]
+    val input = queryExecution.analyzed.output
+    queryExecution.toRdd.mapPartitions { iter =>
+      val bound = tEnc.bind(input)
+      iter.map(bound.fromRow)
+    }
+  }
+
+  /* *********************** *
+   *  Functional Operations  *
+   * *********************** */
+
+  /**
+   * Concise syntax for chaining custom transformations.
+   * {{{
+   *   def featurize(ds: Dataset[T]) = ...
+   *
+   *   dataset
+   *     .transform(featurize)
+   *     .transform(...)
+   * }}}
+   *
+   * @since 1.6.0
+   */
+  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
+
+  /**
+   * Returns a new [[Dataset]] that only contains elements where `func` 
returns `true`.
+   * @since 1.6.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
+
+  /**
+   * Returns a new [[Dataset]] that contains the result of applying `func` to 
each element.
+   * @since 1.6.0
+   */
+  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
+
+  /**
+   * Returns a new [[Dataset]] that contains the result of applying `func` to 
each element.
+   * @since 1.6.0
+   */
+  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] 
= {
+    new Dataset(
+      sqlContext,
+      MapPartitions[T, U](
+        func,
+        implicitly[Encoder[T]],
+        implicitly[Encoder[U]],
+        implicitly[Encoder[U]].schema.toAttributes,
+        logicalPlan))
+  }
+
+  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+    mapPartitions(_.flatMap(func))
+
+  /* ************** *
+   *  Side effects  *
+   * ************** */
+
+  /**
+   * Runs `func` on each element of this Dataset.
+   * @since 1.6.0
+   */
+  def foreach(func: T => Unit): Unit = rdd.foreach(func)
+
+  /**
+   * Runs `func` on each partition of this Dataset.
+   * @since 1.6.0
+   */
+  def foreachPartition(func: Iterator[T] => Unit): Unit = 
rdd.foreachPartition(func)
+
+  /* ************* *
+   *  Aggregation  *
+   * ************* */
+
+  /**
+   * Reduces the elements of this Dataset using the specified  binary 
function.  The given function
+   * must be commutative and associative or the result may be 
non-deterministic.
+   * @since 1.6.0
+   */
+  def reduce(func: (T, T) => T): T = rdd.reduce(func)
+
+  /**
+   * Aggregates the elements of each partition, and then the results for all 
the partitions, using a
+   * given associative and commutative function and a neutral "zero value".
+   *
+   * This behaves somewhat differently than the fold operations implemented 
for non-distributed
+   * collections in functional languages like Scala. This fold operation may 
be applied to
+   * partitions individually, and then those results will be folded into the 
final result.
+   * If op is not commutative, then the result may differ from that of a fold 
applied to a
+   * non-distributed collection.
+   * @since 1.6.0
+   */
+  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
+
+  /**
+   * Returns a [[GroupedDataset]] where the data is grouped by the given key 
function.
+   * @since 1.6.0
+   */
+  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
+    val inputPlan = queryExecution.analyzed
+    val withGroupingKey = AppendColumn(func, inputPlan)
+    val executed = sqlContext.executePlan(withGroupingKey)
+
+    new GroupedDataset(
+      implicitly[Encoder[K]].bindOrdinals(withGroupingKey.newColumns),
+      implicitly[Encoder[T]].bind(inputPlan.output),
+      executed,
+      inputPlan.output,
+      withGroupingKey.newColumns)
+  }
+
+  /* ****************** *
+   *  Typed Relational  *
+   * ****************** */
+
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expression 
for each element.
+   *
+   * {{{
+   *   val ds = Seq(1, 2, 3).toDS()
+   *   val newDS = ds.select(e[Int]("value + 1"))
+   * }}}
+   * @since 1.6.0
+   */
+  def select[U1: Encoder](c1: TypedColumn[U1]): Dataset[U1] = {
+    new Dataset[U1](sqlContext, Project(Alias(c1.expr, "_1")() :: Nil, 
logicalPlan))
+  }
+
+  // Codegen
+  // scalastyle:off
+
+  /** sbt scalaShell; println(Seq(1).toDS().genSelect) */
+  private def genSelect: String = {
+    (2 to 5).map { n =>
+      val types = (1 to n).map(i =>s"U$i").mkString(", ")
+      val args = (1 to n).map(i => s"c$i: TypedColumn[U$i]").mkString(", ")
+      val encoders = (1 to n).map(i => s"c$i.encoder").mkString(", ")
+      val schema = (1 to n).map(i => s"""Alias(c$i.expr, 
"_$i")()""").mkString(" :: ")
+      s"""
+         |/**
+         | * Returns a new [[Dataset]] by computing the given [[Column]] 
expressions for each element.
+         | * @since 1.6.0
+         | */
+         |def select[$types]($args): Dataset[($types)] = {
+         |  implicit val te = new Tuple${n}Encoder($encoders)
+         |  new Dataset[($types)](sqlContext,
+         |    Project(
+         |      $schema :: Nil,
+         |      logicalPlan))
+         |}
+         |
+       """.stripMargin
+    }.mkString("\n")
+  }
+
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions 
for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2](c1: TypedColumn[U1], c2: TypedColumn[U2]): Dataset[(U1, 
U2)] = {
+    implicit val te = new Tuple2Encoder(c1.encoder, c2.encoder)
+    new Dataset[(U1, U2)](sqlContext,
+      Project(
+        Alias(c1.expr, "_1")() :: Alias(c2.expr, "_2")() :: Nil,
+        logicalPlan))
+  }
+
+
+
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions 
for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3](c1: TypedColumn[U1], c2: TypedColumn[U2], c3: 
TypedColumn[U3]): Dataset[(U1, U2, U3)] = {
+    implicit val te = new Tuple3Encoder(c1.encoder, c2.encoder, c3.encoder)
+    new Dataset[(U1, U2, U3)](sqlContext,
+      Project(
+        Alias(c1.expr, "_1")() :: Alias(c2.expr, "_2")() :: Alias(c3.expr, 
"_3")() :: Nil,
+        logicalPlan))
+  }
+
+
+
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions 
for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3, U4](c1: TypedColumn[U1], c2: TypedColumn[U2], c3: 
TypedColumn[U3], c4: TypedColumn[U4]): Dataset[(U1, U2, U3, U4)] = {
+    implicit val te = new Tuple4Encoder(c1.encoder, c2.encoder, c3.encoder, 
c4.encoder)
+    new Dataset[(U1, U2, U3, U4)](sqlContext,
+      Project(
+        Alias(c1.expr, "_1")() :: Alias(c2.expr, "_2")() :: Alias(c3.expr, 
"_3")() :: Alias(c4.expr, "_4")() :: Nil,
+        logicalPlan))
+  }
+
+
+
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions 
for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3, U4, U5](c1: TypedColumn[U1], c2: TypedColumn[U2], c3: 
TypedColumn[U3], c4: TypedColumn[U4], c5: TypedColumn[U5]): Dataset[(U1, U2, 
U3, U4, U5)] = {
+    implicit val te = new Tuple5Encoder(c1.encoder, c2.encoder, c3.encoder, 
c4.encoder, c5.encoder)
+    new Dataset[(U1, U2, U3, U4, U5)](sqlContext,
+      Project(
+        Alias(c1.expr, "_1")() :: Alias(c2.expr, "_2")() :: Alias(c3.expr, 
"_3")() :: Alias(c4.expr, "_4")() :: Alias(c5.expr, "_5")() :: Nil,
+        logicalPlan))
+  }
+
+  // scalastyle:on
+
+  /* **************** *
+   *  Set operations  *
+   * **************** */
+
+  /**
+   * Returns a new [[Dataset]] that contains only the unique elements of this 
[[Dataset]].
+   *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   * @since 1.6.0
+   */
+  def distinct: Dataset[T] = withPlan(Distinct)
+
+  /**
+   * Returns a new [[Dataset]] that contains only the elements of this 
[[Dataset]] that are also
+   * present in `other`.
+   *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   * @since 1.6.0
+   */
+  def intersect(other: Dataset[T]): Dataset[T] =
+    withPlan[T](other)(Intersect)
+
+  /**
+   * Returns a new [[Dataset]] that contains the elements of both this and the 
`other` [[Dataset]]
+   * combined.
+   *
+   * Note that, this function is not a typical set union operation, in that it 
does not eliminate
+   * duplicate items.  As such, it is analagous to `UNION ALL` in SQL.
+   * @since 1.6.0
+   */
+  def union(other: Dataset[T]): Dataset[T] =
+    withPlan[T](other)(Union)
+
+  /**
+   * Returns a new [[Dataset]] where any elements present in `other` have been 
removed.
+   *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   * @since 1.6.0
+   */
+  def subtract(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Except)
+
+  /* ************************** *
+   *  Gather to Driver Actions  *
+   * ************************** */
+
+  /** Returns the first element in this [[Dataset]]. */
+  def first(): T = rdd.first()
+
+  /** Collects the elements to an Array. */
+  def collect(): Array[T] = rdd.collect()
+
+  /** Returns the first `num` elements of this [[Dataset]] as an Array. */
+  def take(num: Int): Array[T] = rdd.take(num)
+
+  /* ******************** *
+   *  Internal Functions  *
+   * ******************** */
+
+  private[sql] def logicalPlan = queryExecution.analyzed
+
+  private[sql] def withPlan(f: LogicalPlan => LogicalPlan): Dataset[T] =
+    new Dataset[T](sqlContext, sqlContext.executePlan(f(logicalPlan)))
+
+  private[sql] def withPlan[R : Encoder](
+      other: Dataset[_])(
+      f: (LogicalPlan, LogicalPlan) => LogicalPlan): Dataset[R] =
+    new Dataset[R](
+      sqlContext,
+      sqlContext.executePlan(
+        f(logicalPlan, other.logicalPlan)))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
new file mode 100644
index 0000000..17817cb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+/**
+ * A container for a [[DataFrame]], used for implicit conversions.
+ *
+ * @since 1.3.0
+ */
+private[sql] case class DatasetHolder[T](df: Dataset[T]) {
+
+  // This is declared with parentheses to prevent the Scala compiler from 
treating
+  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned 
DataFrame.
+  def toDS(): Dataset[T] = df
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
new file mode 100644
index 0000000..89a16dd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.encoders.Encoder
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.QueryExecution
+
+/**
+ * A [[Dataset]] has been logically grouped by a user specified grouping key.  
Users should not
+ * construct a [[GroupedDataset]] directly, but should instead call `groupBy` 
on an existing
+ * [[Dataset]].
+ */
+class GroupedDataset[K, T] private[sql](
+    private val kEncoder: Encoder[K],
+    private val tEncoder: Encoder[T],
+    queryExecution: QueryExecution,
+    private val dataAttributes: Seq[Attribute],
+    private val groupingAttributes: Seq[Attribute]) extends Serializable {
+
+  private implicit def kEnc = kEncoder
+  private implicit def tEnc = tEncoder
+  private def logicalPlan = queryExecution.analyzed
+  private def sqlContext = queryExecution.sqlContext
+
+  /**
+   * Returns a [[Dataset]] that contains each unique key.
+   */
+  def keys: Dataset[K] = {
+    new Dataset[K](
+      sqlContext,
+      Distinct(
+        Project(groupingAttributes, logicalPlan)))
+  }
+
+  /**
+   * Applies the given function to each group of data.  For each unique group, 
the function will
+   * be passed the group key and an iterator that contains all of the elements 
in the group. The
+   * function can return an iterator containing elements of an arbitrary type 
which will be returned
+   * as a new [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group is 
too large to fit into
+   * memory.  However, users must take care to avoid materializing the whole 
iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   */
+  def mapGroups[U : Encoder](f: (K, Iterator[T]) => Iterator[U]): Dataset[U] = 
{
+    new Dataset[U](
+      sqlContext,
+      MapGroups(f, groupingAttributes, logicalPlan))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a107639..5e7198f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -21,6 +21,7 @@ import java.beans.{BeanInfo, Introspector}
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicReference
 
+
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
@@ -33,6 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
@@ -487,6 +489,16 @@ class SQLContext private[sql](
     DataFrame(this, logicalPlan)
   }
 
+
+  def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
+    val enc = implicitly[Encoder[T]]
+    val attributes = enc.schema.toAttributes
+    val encoded = data.map(d => enc.toRow(d).copy())
+    val plan = new LocalRelation(attributes, encoded)
+
+    new Dataset[T](this, plan)
+  }
+
   /**
    * Creates a DataFrame from an RDD[Row]. User can specify whether the input 
rows should be
    * converted to Catalyst rows.

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index bf03c61..af8474d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.encoders._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
@@ -30,9 +34,19 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * A collection of implicit methods for converting common Scala objects into 
[[DataFrame]]s.
  */
-private[sql] abstract class SQLImplicits {
+abstract class SQLImplicits {
   protected def _sqlContext: SQLContext
 
+  implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = 
ProductEncoder[T]
+
+  implicit def newIntEncoder: Encoder[Int] = new IntEncoder()
+  implicit def newLongEncoder: Encoder[Long] = new LongEncoder()
+  implicit def newStringEncoder: Encoder[String] = new StringEncoder()
+
+  implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
DatasetHolder[T] = {
+    DatasetHolder(_sqlContext.createDataset(s))
+  }
+
   /**
    * An implicit conversion that turns a Scala `Symbol` into a [[Column]].
    * @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
new file mode 100644
index 0000000..10742cf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, 
GenerateOrdering}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, 
Ascending, Expression}
+
+object GroupedIterator {
+  def apply(
+      input: Iterator[InternalRow],
+      keyExpressions: Seq[Expression],
+      inputSchema: Seq[Attribute]): Iterator[(InternalRow, 
Iterator[InternalRow])] = {
+    if (input.hasNext) {
+      new GroupedIterator(input, keyExpressions, inputSchema)
+    } else {
+      Iterator.empty
+    }
+  }
+}
+
+/**
+ * Iterates over a presorted set of rows, chunking it up by the grouping 
expression.  Each call to
+ * next will return a pair containing the current group and an iterator that 
will return all the
+ * elements of that group.  Iterators for each group are lazily constructed by 
extracting rows
+ * from the input iterator.  As such, full groups are never materialized by 
this class.
+ *
+ * Example input:
+ * {{{
+ *   Input: [a, 1], [b, 2], [b, 3]
+ *   Grouping: x#1
+ *   InputSchema: x#1, y#2
+ * }}}
+ *
+ * Result:
+ * {{{
+ *   First call to next():  ([a], Iterator([a, 1])
+ *   Second call to next(): ([b], Iterator([b, 2], [b, 3])
+ * }}}
+ *
+ * Note, the class does not handle the case of an empty input for simplicity 
of implementation.
+ * Use the factory to construct a new instance.
+ *
+ * @param input An iterator of rows.  This iterator must be ordered by the 
groupingExpressions or
+ *              it is possible for the same group to appear more than once.
+ * @param groupingExpressions The set of expressions used to do grouping.  The 
result of evaluating
+ *                            these expressions will be returned as the first 
part of each call
+ *                            to `next()`.
+ * @param inputSchema The schema of the rows in the `input` iterator.
+ */
+class GroupedIterator private(
+    input: Iterator[InternalRow],
+    groupingExpressions: Seq[Expression],
+    inputSchema: Seq[Attribute])
+  extends Iterator[(InternalRow, Iterator[InternalRow])] {
+
+  /** Compares two input rows and returns 0 if they are in the same group. */
+  val sortOrder = groupingExpressions.map(SortOrder(_, Ascending))
+  val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema)
+
+  /** Creates a row containing only the key for a given input row. */
+  val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, 
inputSchema)
+
+  /**
+   * Holds null or the row that will be returned on next call to `next()` in 
the inner iterator.
+   */
+  var currentRow = input.next()
+
+  /** Holds a copy of an input row that is in the current group. */
+  var currentGroup = currentRow.copy()
+  var currentIterator: Iterator[InternalRow] = null
+  assert(keyOrdering.compare(currentGroup, currentRow) == 0)
+
+  // Return true if we already have the next iterator or fetching a new 
iterator is successful.
+  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator
+
+  def next(): (InternalRow, Iterator[InternalRow]) = {
+    assert(hasNext) // Ensure we have fetched the next iterator.
+    val ret = (keyProjection(currentGroup), currentIterator)
+    currentIterator = null
+    ret
+  }
+
+  def fetchNextGroupIterator(): Boolean = {
+    if (currentRow != null || input.hasNext) {
+      val inputIterator = new Iterator[InternalRow] {
+        // Return true if we have a row and it is in the current group, or if 
fetching a new row is
+        // successful.
+        def hasNext = {
+          (currentRow != null && keyOrdering.compare(currentGroup, currentRow) 
== 0) ||
+            fetchNextRowInGroup()
+        }
+
+        def fetchNextRowInGroup(): Boolean = {
+          if (currentRow != null || input.hasNext) {
+            currentRow = input.next()
+            if (keyOrdering.compare(currentGroup, currentRow) == 0) {
+              // The row is in the current group.  Continue the inner iterator.
+              true
+            } else {
+              // We got a row, but its not in the right group.  End this inner 
iterator and prepare
+              // for the next group.
+              currentIterator = null
+              currentGroup = currentRow.copy()
+              false
+            }
+          } else {
+            // There is no more input so we are done.
+            false
+          }
+        }
+
+        def next(): InternalRow = {
+          assert(hasNext) // Ensure we have fetched the next row.
+          val res = currentRow
+          currentRow = null
+          res
+        }
+      }
+      currentIterator = inputIterator
+      true
+    } else {
+      false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 79bd1a4..637deff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -372,6 +372,14 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.Distinct(child) =>
         throw new IllegalStateException(
           "logical distinct operator should have been replaced by aggregate in 
the optimizer")
+
+      case logical.MapPartitions(f, tEnc, uEnc, output, child) =>
+        execution.MapPartitions(f, tEnc, uEnc, output, planLater(child)) :: Nil
+      case logical.AppendColumn(f, tEnc, uEnc, newCol, child) =>
+        execution.AppendColumns(f, tEnc, uEnc, newCol, planLater(child)) :: Nil
+      case logical.MapGroups(f, kEnc, tEnc, uEnc, grouping, output, child) =>
+        execution.MapGroups(f, kEnc, tEnc, uEnc, grouping, output, 
planLater(child)) :: Nil
+
       case logical.Repartition(numPartitions, shuffle, child) =>
         if (shuffle) {
           execution.Exchange(RoundRobinPartitioning(numPartitions), 
planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index dc38fe5..2bb3dba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.MutablePair
@@ -311,3 +313,80 @@ case class OutputFaker(output: Seq[Attribute], child: 
SparkPlan) extends SparkPl
 
   protected override def doExecute(): RDD[InternalRow] = child.execute()
 }
+
+/**
+ * Applies the given function to each input row and encodes the result.
+ */
+case class MapPartitions[T, U](
+    func: Iterator[T] => Iterator[U],
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryNode {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions { iter =>
+      val tBoundEncoder = tEncoder.bind(child.output)
+      func(iter.map(tBoundEncoder.fromRow)).map(uEncoder.toRow)
+    }
+  }
+}
+
+/**
+ * Applies the given function to each input row, appending the encoded result 
at the end of the row.
+ */
+case class AppendColumns[T, U](
+    func: T => U,
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    newColumns: Seq[Attribute],
+    child: SparkPlan) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output ++ newColumns
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions { iter =>
+      val tBoundEncoder = tEncoder.bind(child.output)
+      val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, 
uEncoder.schema)
+      iter.map { row =>
+        val newColumns = uEncoder.toRow(func(tBoundEncoder.fromRow(row)))
+        combiner.join(row.asInstanceOf[UnsafeRow], 
newColumns.asInstanceOf[UnsafeRow]): InternalRow
+      }
+    }
+  }
+}
+
+/**
+ * Groups the input rows together and calls the function with each group and 
an iterator containing
+ * all elements in the group.  The result of this function is encoded and 
flattened before
+ * being output.
+ */
+case class MapGroups[K, T, U](
+    func: (K, Iterator[T]) => Iterator[U],
+    kEncoder: Encoder[K],
+    tEncoder: Encoder[T],
+    uEncoder: Encoder[U],
+    groupingAttributes: Seq[Attribute],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryNode {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions { iter =>
+      val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+      val groupKeyEncoder = kEncoder.bind(groupingAttributes)
+
+      grouped.flatMap { case (key, rowIter) =>
+        val result = func(
+          groupKeyEncoder.fromRow(key),
+          rowIter.map(tEncoder.fromRow))
+        result.map(uEncoder.toRow)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
new file mode 100644
index 0000000..3244355
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.language.postfixOps
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+case class IntClass(value: Int)
+
+class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("toDS") {
+    val data = Seq(1, 2, 3, 4, 5, 6)
+    checkAnswer(
+      data.toDS(),
+      data: _*)
+  }
+
+  test("as case class / collect") {
+    val ds = Seq(1, 2, 3).toDS().as[IntClass]
+    checkAnswer(
+      ds,
+      IntClass(1), IntClass(2), IntClass(3))
+
+    assert(ds.collect().head == IntClass(1))
+  }
+
+  test("map") {
+    val ds = Seq(1, 2, 3).toDS()
+    checkAnswer(
+      ds.map(_ + 1),
+      2, 3, 4)
+  }
+
+  test("filter") {
+    val ds = Seq(1, 2, 3, 4).toDS()
+    checkAnswer(
+      ds.filter(_ % 2 == 0),
+      2, 4)
+  }
+
+  test("foreach") {
+    val ds = Seq(1, 2, 3).toDS()
+    val acc = sparkContext.accumulator(0)
+    ds.foreach(acc +=)
+    assert(acc.value == 6)
+  }
+
+  test("foreachPartition") {
+    val ds = Seq(1, 2, 3).toDS()
+    val acc = sparkContext.accumulator(0)
+    ds.foreachPartition(_.foreach(acc +=))
+    assert(acc.value == 6)
+  }
+
+  test("reduce") {
+    val ds = Seq(1, 2, 3).toDS()
+    assert(ds.reduce(_ + _) == 6)
+  }
+
+  test("fold") {
+    val ds = Seq(1, 2, 3).toDS()
+    assert(ds.fold(0)(_ + _) == 6)
+  }
+
+  test("groupBy function, keys") {
+    val ds = Seq(1, 2, 3, 4, 5).toDS()
+    val grouped = ds.groupBy(_ % 2)
+    checkAnswer(
+      grouped.keys,
+      0, 1)
+  }
+
+  test("groupBy function, mapGroups") {
+    val ds = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).toDS()
+    val grouped = ds.groupBy(_ % 2)
+    val agged = grouped.mapGroups { case (g, iter) =>
+      val name = if (g == 0) "even" else "odd"
+      Iterator((name, iter.size))
+    }
+
+    checkAnswer(
+      agged,
+      ("even", 5), ("odd", 6))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
new file mode 100644
index 0000000..0849624
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.language.postfixOps
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+
+case class ClassData(a: String, b: Int)
+
+class DatasetSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("toDS") {
+    val data = Seq(("a", 1) , ("b", 2), ("c", 3))
+    checkAnswer(
+      data.toDS(),
+      data: _*)
+  }
+
+  test("as case class / collect") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDF("a", "b").as[ClassData]
+    checkAnswer(
+      ds,
+      ClassData("a", 1), ClassData("b", 2), ClassData("c", 3))
+    assert(ds.collect().head == ClassData("a", 1))
+  }
+
+  test("as case class - reordered fields by name") {
+    val ds = Seq((1, "a"), (2, "b"), (3, "c")).toDF("b", "a").as[ClassData]
+    assert(ds.collect() === Array(ClassData("a", 1), ClassData("b", 2), 
ClassData("c", 3)))
+  }
+
+  test("map") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    checkAnswer(
+      ds.map(v => (v._1, v._2 + 1)),
+      ("a", 2), ("b", 3), ("c", 4))
+  }
+
+  test("select") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    checkAnswer(
+      ds.select(expr("_2 + 1").as[Int]),
+      2, 3, 4)
+  }
+
+  test("select 3") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    checkAnswer(
+      ds.select(
+        expr("_1").as[String],
+        expr("_2").as[Int],
+        expr("_2 + 1").as[Int]),
+      ("a", 1, 2), ("b", 2, 3), ("c", 3, 4))
+  }
+
+  test("filter") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    checkAnswer(
+      ds.filter(_._1 == "b"),
+      ("b", 2))
+  }
+
+  test("foreach") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    val acc = sparkContext.accumulator(0)
+    ds.foreach(v => acc += v._2)
+    assert(acc.value == 6)
+  }
+
+  test("foreachPartition") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    val acc = sparkContext.accumulator(0)
+    ds.foreachPartition(_.foreach(v => acc += v._2))
+    assert(acc.value == 6)
+  }
+
+  test("reduce") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    assert(ds.reduce((a, b) => ("sum", a._2 + b._2)) == ("sum", 6))
+  }
+
+  test("fold") {
+    val ds = Seq(("a", 1) , ("b", 2), ("c", 3)).toDS()
+    assert(ds.fold(("", 0))((a, b) => ("sum", a._2 + b._2)) == ("sum", 6))
+  }
+
+  test("groupBy function, keys") {
+    val ds = Seq(("a", 1), ("b", 1)).toDS()
+    val grouped = ds.groupBy(v => (1, v._2))
+    checkAnswer(
+      grouped.keys,
+      (1, 1))
+  }
+
+  test("groupBy function, mapGroups") {
+    val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
+    val grouped = ds.groupBy(v => (v._1, "word"))
+    val agged = grouped.mapGroups { case (g, iter) =>
+      Iterator((g._1, iter.map(_._2).sum))
+    }
+
+    checkAnswer(
+      agged,
+      ("a", 30), ("b", 3), ("c", 1))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53e83a3a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index e3c5a42..aba5675 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql
 import java.util.{Locale, TimeZone}
 
 import scala.collection.JavaConverters._
+import scala.reflect.runtime.universe._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.catalyst.encoders.{ProductEncoder, Encoder}
 
 abstract class QueryTest extends PlanTest {
 
@@ -53,6 +55,12 @@ abstract class QueryTest extends PlanTest {
     }
   }
 
+  protected def checkAnswer[T : Encoder](ds: => Dataset[T], expectedAnswer: 
T*): Unit = {
+    checkAnswer(
+      ds.toDF(),
+      sqlContext.createDataset(expectedAnswer).toDF().collect().toSeq)
+  }
+
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param df the [[DataFrame]] to be executed


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to