spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...
Repository: spark Updated Branches: refs/heads/branch-1.3 aa2d157c6 - 84735c363 [SPARK-6117] [SQL] add describe function to DataFrame for summary statis... Please review my solution for SPARK-6117 Author: azagrebin azagre...@gmail.com Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits: f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it locally into resulting DF, colocate test data with test case ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF without numeric columns 9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for summary statistics (cherry picked from commit 5bbcd1304cfebba31ec6857a80d3825a40d02e83) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84735c36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84735c36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84735c36 Branch: refs/heads/branch-1.3 Commit: 84735c363e220baf2cc39dcef5f040812e23c086 Parents: aa2d157 Author: azagrebin azagre...@gmail.com Authored: Thu Mar 26 00:25:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 12:25:48 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 53 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 45 + 2 files changed, 97 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5aece16..db56182 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType} +import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,6 +752,57 @@ class DataFrame private[sql]( } /** + * Compute numerical statistics for given columns of this [[DataFrame]]: + * count, mean (avg), stddev (standard deviation), min, max. + * Each row of the resulting [[DataFrame]] contains column with statistic name + * and columns with statistic results for each given column. + * If no columns are given then computes for all numerical columns. + * + * {{{ + * df.describe(age, height) + * + * // summary age height + * // count 10.0 10.0 + * // mean53.3 178.05 + * // stddev 11.6 15.7 + * // min 18.0 163.0 + * // max 92.0 192.0 + * }}} + */ + @scala.annotation.varargs + def describe(cols: String*): DataFrame = { + +def stddevExpr(expr: Expression) = + Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), Average(expr + +val statistics = List[(String, Expression = Expression)]( + count - Count, + mean - Average, + stddev - stddevExpr, + min - Min, + max - Max) + +val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList + +val localAgg = if (aggCols.nonEmpty) { + val aggExprs = statistics.flatMap { case (_, colToAgg) = +aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c)) + } + + agg(aggExprs.head, aggExprs.tail: _*).head().toSeq +.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) = +Row(statistic :: aggregation.toList: _*) + } +} else { + statistics.map { case (name, _) = Row(name) } +} + +val schema = StructType((summary :: aggCols).map(StructField(_, StringType))) +val rowRdd = sqlContext.sparkContext.parallelize(localAgg) +sqlContext.createDataFrame(rowRdd, schema) + } + + /** * Returns the first `n` rows. * @group action */ http://git-wip-us.apache.org/repos/asf/spark/blob/84735c36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c30ed69..afbedd1 100644 ---
spark git commit: [SPARK-6117] [SQL] add describe function to DataFrame for summary statis...
Repository: spark Updated Branches: refs/heads/master f53580297 - 5bbcd1304 [SPARK-6117] [SQL] add describe function to DataFrame for summary statis... Please review my solution for SPARK-6117 Author: azagrebin azagre...@gmail.com Closes #5073 from azagrebin/SPARK-6117 and squashes the following commits: f9056ac [azagrebin] [SPARK-6117] [SQL] create one aggregation and split it locally into resulting DF, colocate test data with test case ddb3950 [azagrebin] [SPARK-6117] [SQL] simplify implementation, add test for DF without numeric columns 9daf31e [azagrebin] [SPARK-6117] [SQL] add describe function to DataFrame for summary statistics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bbcd130 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bbcd130 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bbcd130 Branch: refs/heads/master Commit: 5bbcd1304cfebba31ec6857a80d3825a40d02e83 Parents: f535802 Author: azagrebin azagre...@gmail.com Authored: Thu Mar 26 00:25:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Mar 26 00:25:04 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 53 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 45 + 2 files changed, 97 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5aece16..db56182 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.types.{NumericType, StructType} +import org.apache.spark.sql.types.{NumericType, StructType, StructField, StringType} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} import org.apache.spark.util.Utils @@ -752,6 +752,57 @@ class DataFrame private[sql]( } /** + * Compute numerical statistics for given columns of this [[DataFrame]]: + * count, mean (avg), stddev (standard deviation), min, max. + * Each row of the resulting [[DataFrame]] contains column with statistic name + * and columns with statistic results for each given column. + * If no columns are given then computes for all numerical columns. + * + * {{{ + * df.describe(age, height) + * + * // summary age height + * // count 10.0 10.0 + * // mean53.3 178.05 + * // stddev 11.6 15.7 + * // min 18.0 163.0 + * // max 92.0 192.0 + * }}} + */ + @scala.annotation.varargs + def describe(cols: String*): DataFrame = { + +def stddevExpr(expr: Expression) = + Sqrt(Subtract(Average(Multiply(expr, expr)), Multiply(Average(expr), Average(expr + +val statistics = List[(String, Expression = Expression)]( + count - Count, + mean - Average, + stddev - stddevExpr, + min - Min, + max - Max) + +val aggCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList + +val localAgg = if (aggCols.nonEmpty) { + val aggExprs = statistics.flatMap { case (_, colToAgg) = +aggCols.map(c = Column(colToAgg(Column(c).expr)).as(c)) + } + + agg(aggExprs.head, aggExprs.tail: _*).head().toSeq +.grouped(aggCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) = +Row(statistic :: aggregation.toList: _*) + } +} else { + statistics.map { case (name, _) = Row(name) } +} + +val schema = StructType((summary :: aggCols).map(StructField(_, StringType))) +val rowRdd = sqlContext.sparkContext.parallelize(localAgg) +sqlContext.createDataFrame(rowRdd, schema) + } + + /** * Returns the first `n` rows. * @group action */ http://git-wip-us.apache.org/repos/asf/spark/blob/5bbcd130/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c30ed69..afbedd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -443,6 +443,51 @@ class DataFrameSuite