Repository: spark
Updated Branches:
  refs/heads/master 1d04dc95c -> f6fcb4874


[SPARK-11477] [SQL] support create Dataset from RDD

Author: Wenchen Fan <wenc...@databricks.com>

Closes #9434 from cloud-fan/rdd2ds and squashes the following commits:

0892d72 [Wenchen Fan] support create Dataset from RDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fcb487
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fcb487
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fcb487

Branch: refs/heads/master
Commit: f6fcb4874ce20a1daa91b7434cf9c0254a89e979
Parents: 1d04dc9
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Nov 4 00:15:50 2015 +0100
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Nov 4 00:15:50 2015 +0100

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala    | 9 +++++++++
 .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala  | 4 ++++
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala  | 7 +++++++
 3 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2cb9443..5ad3871 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -499,6 +499,15 @@ class SQLContext private[sql](
     new Dataset[T](this, plan)
   }
 
+  def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
+    val enc = encoderFor[T]
+    val attributes = enc.schema.toAttributes
+    val encoded = data.map(d => enc.toRow(d))
+    val plan = LogicalRDD(attributes, encoded)(self)
+
+    new Dataset[T](this, plan)
+  }
+
   /**
    * Creates a DataFrame from an RDD[Row]. User can specify whether the input 
rows should be
    * converted to Catalyst rows.

http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index f460a86..f2904e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -48,6 +48,10 @@ abstract class SQLImplicits {
   implicit def newBooleanEncoder: Encoder[Boolean] = 
ExpressionEncoder[Boolean](flat = true)
   implicit def newStringEncoder: Encoder[String] = 
ExpressionEncoder[String](flat = true)
 
+  implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] 
= {
+    DatasetHolder(_sqlContext.createDataset(rdd))
+  }
+
   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
DatasetHolder[T] = {
     DatasetHolder(_sqlContext.createDataset(s))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5973fa7..3e9b621 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       data: _*)
   }
 
+  test("toDS with RDD") {
+    val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS()
+    checkAnswer(
+      ds.mapPartitions(_ => Iterator(1)),
+      1, 1, 1)
+  }
+
   test("as tuple") {
     val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to