spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 aa2d157c6 - 84735c363


[SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

Please review my solution for SPARK-6117

Author: azagrebin azagre...@gmail.com

Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits:

f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it 
locally into resulting DF, colocate test data with test case
ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF 
without numeric columns
9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for 
summary statistics

(cherry picked from commit 5bbcd1304cfebba31ec6857a80d3825a40d02e83)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84735c36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84735c36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84735c36

Branch: refs/heads/branch-1.3
Commit: 84735c363e220baf2cc39dcef5f040812e23c086
Parents: aa2d157
Author: azagrebin azagre...@gmail.com
Authored: Thu Mar 26 00:25:04 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 12:25:48 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 53 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 45 +
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5aece16..db56182 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,6 +752,57 @@ class DataFrame private[sql](
   }
 
   /**
+   * Compute numerical statistics for given columns of this [[DataFrame]]:
+   * count, mean (avg), stddev (standard deviation), min, max.
+   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
+   * and columns with statistic results for each given column.
+   * If no columns are given then computes for all numerical columns.
+   *
+   * {{{
+   *   df.describe(age, height)
+   *
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min 18.0  163.0
+   *   // max 92.0  192.0
+   * }}}
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = {
+
+def stddevExpr(expr: Expression) =
+  Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), 
Average(expr
+
+val statistics = List[(String, Expression = Expression)](
+  count - Count,
+  mean - Average,
+  stddev - stddevExpr,
+  min - Min,
+  max - Max)
+
+val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else 
cols).toList
+
+val localAgg = if (aggCols.nonEmpty) {
+  val aggExprs = statistics.flatMap { case (_, colToAgg) =
+aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c))
+  }
+
+  agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, 
(statistic, _)) =
+Row(statistic :: aggregation.toList: _*)
+  }
+} else {
+  statistics.map { case (name, _) = Row(name) }
+}
+
+val schema = StructType((summary :: aggCols).map(StructField(_, 
StringType)))
+val rowRdd = sqlContext.sparkContext.parallelize(localAgg)
+sqlContext.createDataFrame(rowRdd, schema)
+  }
+
+  /**
* Returns the first `n` rows.
* @group action
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c30ed69..afbedd1 100644
--- 

spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

2015-03-26 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f53580297 - 5bbcd1304


[SPARK-6117] [SQL] add describe function to DataFrame for summary statis...

Please review my solution for SPARK-6117

Author: azagrebin azagre...@gmail.com

Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits:

f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it 
locally into resulting DF, colocate test data with test case
ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF 
without numeric columns
9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for 
summary statistics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bbcd130
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bbcd130
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bbcd130

Branch: refs/heads/master
Commit: 5bbcd1304cfebba31ec6857a80d3825a40d02e83
Parents: f535802
Author: azagrebin azagre...@gmail.com
Authored: Thu Mar 26 00:25:04 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Thu Mar 26 00:25:04 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 53 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 45 +
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5aece16..db56182 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
 import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{NumericType, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType, StructField, 
StringType}
 import org.apache.spark.sql.sources.{ResolvedDataSource, 
CreateTableUsingAsSelect}
 import org.apache.spark.util.Utils
 
@@ -752,6 +752,57 @@ class DataFrame private[sql](
   }
 
   /**
+   * Compute numerical statistics for given columns of this [[DataFrame]]:
+   * count, mean (avg), stddev (standard deviation), min, max.
+   * Each row of the resulting [[DataFrame]] contains column with statistic 
name
+   * and columns with statistic results for each given column.
+   * If no columns are given then computes for all numerical columns.
+   *
+   * {{{
+   *   df.describe(age, height)
+   *
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min 18.0  163.0
+   *   // max 92.0  192.0
+   * }}}
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = {
+
+def stddevExpr(expr: Expression) =
+  Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), 
Average(expr
+
+val statistics = List[(String, Expression = Expression)](
+  count - Count,
+  mean - Average,
+  stddev - stddevExpr,
+  min - Min,
+  max - Max)
+
+val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else 
cols).toList
+
+val localAgg = if (aggCols.nonEmpty) {
+  val aggExprs = statistics.flatMap { case (_, colToAgg) =
+aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c))
+  }
+
+  agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, 
(statistic, _)) =
+Row(statistic :: aggregation.toList: _*)
+  }
+} else {
+  statistics.map { case (name, _) = Row(name) }
+}
+
+val schema = StructType((summary :: aggCols).map(StructField(_, 
StringType)))
+val rowRdd = sqlContext.sparkContext.parallelize(localAgg)
+sqlContext.createDataFrame(rowRdd, schema)
+  }
+
+  /**
* Returns the first `n` rows.
* @group action
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c30ed69..afbedd1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -443,6 +443,51 @@ class DataFrameSuite