Repository: spark
Updated Branches:
  refs/heads/master 750ed64cd -> bb1fda01f


[SPARK-13826][SQL] Addendum: update documentation for Datasets

## What changes were proposed in this pull request?
This patch updates documentations for Datasets. I also updated some internal 
documentation for exchange/broadcast.

## How was this patch tested?
Just documentation/api stability update.

Author: Reynold Xin <r...@databricks.com>

Closes #11814 from rxin/dataset-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1fda01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1fda01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1fda01

Branch: refs/heads/master
Commit: bb1fda01fe620422c442b305f5ceeb552871a490
Parents: 750ed64
Author: Reynold Xin <r...@databricks.com>
Authored: Fri Mar 18 00:57:23 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Mar 18 00:57:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 11 ++-
 .../scala/org/apache/spark/sql/Dataset.scala    | 81 +++++++++++++-------
 .../execution/exchange/BroadcastExchange.scala  |  3 +-
 .../spark/sql/execution/exchange/Exchange.scala |  6 +-
 4 files changed, 70 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9951f0f..7ed1c51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -138,7 +138,16 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
   /**
    * Partitions the output by the given columns on the file system. If 
specified, the output is
-   * laid out on the file system similar to Hive's partitioning scheme.
+   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
+   * partition a dataset by year and then month, the directory layout would 
look like:
+   *
+   *   - year=2016/month=01/
+   *   - year=2016/month=02/
+   *
+   * Partitioning is one of the most widely used techniques to optimize 
physical data layout.
+   * It provides a coarse-grained index for skipping unnecessary data reads 
when queries have
+   * predicates on the partitioned columns. In order for partitioning to work 
well, the number
+   * of distinct values in each column should typically be less than tens of 
thousands.
    *
    * This was initially applicable for Parquet but in 1.5+ covers JSON, text, 
ORC and avro as well.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 209bac3..39f7f35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,36 +61,48 @@ private[sql] object Dataset {
 }
 
 /**
- * A [[Dataset]] is a strongly typed collection of objects that can be 
transformed in parallel
- * using functional or relational operations.
+ * A [[Dataset]] is a strongly typed collection of domain-specific objects 
that can be transformed
+ * in parallel using functional or relational operations. Each Dataset also 
has an untyped view
+ * called a [[DataFrame]], which is a Dataset of [[Row]].
  *
- * A [[Dataset]] differs from an [[RDD]] in the following ways:
+ * Operations available on Datasets are divided into transformations and 
actions. Transformations
+ * are the ones that produce new Datasets, and actions are the ones that 
trigger computation and
+ * return results. Example transformations include map, filter, select, 
aggregate (groupBy).
+ * Example actions count, show, or writing data out to file systems.
  *
- *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and 
the data is stored
- *    in the encoded form.  This representation allows for additional logical 
operations and
- *    enables many operations (sorting, shuffling, etc.) to be performed 
without deserializing to
- *    an object.
- *  - The creation of a [[Dataset]] requires the presence of an explicit 
[[Encoder]] that can be
- *    used to serialize the object into a binary format.  Encoders are also 
capable of mapping the
- *    schema of a given object to the Spark SQL type system.  In contrast, 
RDDs rely on runtime
- *    reflection based serialization. Operations that change the type of 
object stored in the
- *    dataset also need an encoder for the new type.
+ * Datasets are "lazy", i.e. computations are only triggered when an action is 
invoked. Internally,
+ * a Dataset represents a logical plan that describes the computation required 
to produce the data.
+ * When an action is invoked, Spark's query optimizer optimizes the logical 
plan and generates a
+ * physical plan for efficient execution in a parallel or distributed manner. 
To explore the
+ * logical plan as well as optimized physical plan, use the `explain` function.
  *
- * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] 
can be thought of as
- * a specialized DataFrame, where the elements map to a specific JVM object 
type, instead of to a
- * generic [[Row]] container.  Since Spark 2.0.0, DataFrame is simply a type 
alias of
- * `Dataset[Row]`.
+ * To efficiently support domain-specific objects, an [[Encoder]] is required. 
The encoder maps
+ * the domain specific type T to Spark's internal type system. For example, 
given a class Person
+ * with two fields, name (string) and age (int), an encoder is used to tell 
Spark to generate code
+ * at runtime to serialize the Person object into a binary structure. This 
binary structure often
+ * has much lower memory footprint as well as are optimized for efficiency in 
data processing
+ * (e.g. in a columnar format). To understand the internal binary 
representation for data, use the
+ * `schema` function.
  *
- * The following example creates a `Dataset[Row]` by pointing Spark SQL to a 
Parquet data set.
+ * There are typically two ways to create a Dataset. The most common way to by 
pointing Spark
+ * to some files on storage systems, using the `read` function available on a 
`SparkSession`.
  * {{{
- *   val people = sqlContext.read.parquet("...")  // in Scala
- *   Dataset<Row> people = sqlContext.read().parquet("...")  // in Java
+ *   val people = session.read.parquet("...").as[Person]  // Scala
+ *   Dataset<Person> people = 
session.read().parquet("...").as(Encoders.bean(Person.class)  // Java
  * }}}
  *
- * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
- * defined in: [[Dataset]] (this class), [[Column]], and [[functions]].
+ * Datasets can also be created through transformations available on existing 
Datasets. For example,
+ * the following creates a new Dataset by applying a filter on the existing 
one:
+ * {{{
+ *   val names = people.map(_.name)  // in Scala; names is a Dataset[String]
+ *   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING) 
 // in Java 8
+ * }}}
  *
- * To select a column from the data frame, use `apply` method in Scala and 
`col` in Java.
+ * Dataset operations can also be untyped, through the various 
domain-specific-language (DSL)
+ * functions defined in: [[Dataset]] (this class), [[Column]], and 
[[functions]]. These operations
+ * are very similar to the operations available in the data frame abstraction 
in R or Python.
+ *
+ * To select a column from the Dataset, use `apply` method in Scala and `col` 
in Java.
  * {{{
  *   val ageCol = people("age")  // in Scala
  *   Column ageCol = people.col("age")  // in Java
@@ -241,7 +253,6 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * :: Experimental ::
    * Converts this strongly typed collection of data to generic Dataframe.  In 
contrast to the
    * strongly typed objects that Dataset operations work on, a Dataframe 
returns generic [[Row]]
    * objects that allow fields to be accessed by ordinal or name.
@@ -251,7 +262,6 @@ class Dataset[T] private[sql](
    */
   // This is declared with parentheses to prevent the Scala compiler from 
treating
   // `ds.toDF("1")` as invoking this toDF and then apply on the returned 
DataFrame.
-  @Experimental
   def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, 
RowEncoder(schema))
 
   /**
@@ -1094,7 +1104,7 @@ class Dataset[T] private[sql](
   def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), 
GroupedData.CubeType)
 
   /**
-   * Groups the [[Dataset]] 2.0.0
+   * Groups the [[Dataset]] using the specified columns, so we can run 
aggregation on them.
    * See [[GroupedData]] for all the available aggregate functions.
    *
    * This is a variant of groupBy that can only group by existing columns 
using column names
@@ -1314,7 +1324,8 @@ class Dataset[T] private[sql](
 
   /**
    * Returns a new [[Dataset]] by taking the first `n` rows. The difference 
between this function
-   * and `head` is that `head` returns an array while `limit` returns a new 
[[Dataset]].
+   * and `head` is that `head` is an action and returns an array (by 
triggering query execution)
+   * while `limit` returns a new [[Dataset]].
    *
    * @group typedrel
    * @since 2.0.0
@@ -1327,6 +1338,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing union of rows in this frame and 
another frame.
    * This is equivalent to `UNION ALL` in SQL.
    *
+   * To do a SQL-style set union (that does deduplication of elements), use 
this function followed
+   * by a [[distinct]].
+   *
    * @group typedrel
    * @since 2.0.0
    */
@@ -1349,6 +1363,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing rows only in both this frame and 
another frame.
    * This is equivalent to `INTERSECT` in SQL.
    *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 1.6.0
    */
@@ -1360,6 +1377,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing rows in this frame but not in 
another frame.
    * This is equivalent to `EXCEPT` in SQL.
    *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 2.0.0
    */
@@ -1448,6 +1468,7 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
    * (Scala-specific) Returns a new [[Dataset]] where each row has been 
expanded to zero or more
    * rows by the provided function.  This is similar to a `LATERAL VIEW` in 
HiveQL. The columns of
    * the input row are implicitly joined with each row that is output by the 
function.
@@ -1470,6 +1491,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
+  @Experimental
   def explode[A <: Product : TypeTag](input: Column*)(f: Row => 
TraversableOnce[A]): DataFrame = {
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
 
@@ -1489,6 +1511,7 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
    * (Scala-specific) Returns a new [[Dataset]] where a single column has been 
expanded to zero
    * or more rows by the provided function.  This is similar to a `LATERAL 
VIEW` in HiveQL. All
    * columns of the input row are implicitly joined with each value that is 
output by the function.
@@ -1500,6 +1523,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
+  @Experimental
   def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A 
=> TraversableOnce[B])
     : DataFrame = {
     val dataType = ScalaReflection.schemaFor[B].dataType
@@ -1770,7 +1794,7 @@ class Dataset[T] private[sql](
   /**
    * Concise syntax for chaining custom transformations.
    * {{{
-   *   def featurize(ds: Dataset[T]) = ...
+   *   def featurize(ds: Dataset[T]): Dataset[U] = ...
    *
    *   ds
    *     .transform(featurize)
@@ -2051,6 +2075,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] that contains only the unique rows from this 
[[Dataset]].
    * This is an alias for `dropDuplicates`.
    *
+   * Note that, equality checking is performed directly on the encoded 
representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index 1a5c6a6..102a935 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,9 +23,8 @@ import scala.concurrent.duration._
 import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
BroadcastPartitioning, Partitioning}
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.util.ThreadUtils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 9eaadea..df7ad48 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -30,7 +30,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 /**
- * An interface for exchanges.
+ * Base class for operators that exchange data among multiple threads or 
processes.
+ *
+ * Exchanges are the key class of operators that enable parallelism. Although 
the implementation
+ * differs significantly, the concept is similar to the exchange operator 
described in
+ * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz 
Graefe.
  */
 abstract class Exchange extends UnaryNode {
   override def output: Seq[Attribute] = child.output


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to