[ 
https://issues.apache.org/jira/browse/SPARK-26752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guilherme Beltramini updated SPARK-26752:
-----------------------------------------
    Description: 
The agg function in 
[org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset]
 accepts as input:
 * Column*
 * Map[String, String]
 * (String, String)*

I'm proposing to add Map[String, Seq[String]], where the keys are the columns 
to aggregate, and the values are the functions to apply the aggregation. Here 
is a similar question: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html.

In the example below (running in spark-shell, with Spark 2.4.0), I'm showing a 
workaround. What I'm proposing is that agg should accept aggMap as input:
{code:java}
scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", 
20), ("c", 100)).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: int]

scala> df.show
+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   a|   2|
|   a|   3|
|   a|   4|
|   b|  10|
|   b|  20|
|   c| 100|
+----+----+

scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", 
"mean"))
aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> 
List(count), col2 -> List(min, max, mean))

scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) => 
Seq(c).zipAll(fns, c, "") }
aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), 
(col2,max), (col2,mean))

scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*)
dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint ... 
3 more fields]

scala> dfAgg.orderBy("col1").show
+----+-----------+---------+---------+---------+
|col1|count(col1)|min(col2)|max(col2)|avg(col2)|
+----+-----------+---------+---------+---------+
|   a|          4|        1|        4|      2.5|
|   b|          2|       10|       20|     15.0|
|   c|          1|      100|      100|    100.0|
+----+-----------+---------+---------+---------+
{code}
 

 

  was:
The agg function in 
[org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset]
 accepts as input:
 * Column*
 * Map[String, String]
 * (String, String)*

I'm proposing to add Map[String, Seq[String]], where the keys are the columns 
to aggregate, and the values are the functions to apply the aggregation.

In the example below (running in spark-shell, with Spark 2.4.0), I'm showing a 
workaround. What I'm proposing is that agg should accept aggMap as input:
{code:java}
scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", 
20), ("c", 100)).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: int]

scala> df.show
+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   a|   2|
|   a|   3|
|   a|   4|
|   b|  10|
|   b|  20|
|   c| 100|
+----+----+

scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", 
"mean"))
aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> 
List(count), col2 -> List(min, max, mean))

scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) => 
Seq(c).zipAll(fns, c, "") }
aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), 
(col2,max), (col2,mean))

scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*)
dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint ... 
3 more fields]

scala> dfAgg.orderBy("col1").show
+----+-----------+---------+---------+---------+
|col1|count(col1)|min(col2)|max(col2)|avg(col2)|
+----+-----------+---------+---------+---------+
|   a|          4|        1|        4|      2.5|
|   b|          2|       10|       20|     15.0|
|   c|          1|      100|      100|    100.0|
+----+-----------+---------+---------+---------+
{code}
 

 


> Multiple aggregate methods in the same column in DataFrame
> ----------------------------------------------------------
>
>                 Key: SPARK-26752
>                 URL: https://issues.apache.org/jira/browse/SPARK-26752
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Guilherme Beltramini
>            Priority: Minor
>
> The agg function in 
> [org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset]
>  accepts as input:
>  * Column*
>  * Map[String, String]
>  * (String, String)*
> I'm proposing to add Map[String, Seq[String]], where the keys are the columns 
> to aggregate, and the values are the functions to apply the aggregation. Here 
> is a similar question: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html.
> In the example below (running in spark-shell, with Spark 2.4.0), I'm showing 
> a workaround. What I'm proposing is that agg should accept aggMap as input:
> {code:java}
> scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", 
> 20), ("c", 100)).toDF("col1", "col2")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: int]
> scala> df.show
> +----+----+
> |col1|col2|
> +----+----+
> |   a|   1|
> |   a|   2|
> |   a|   3|
> |   a|   4|
> |   b|  10|
> |   b|  20|
> |   c| 100|
> +----+----+
> scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", 
> "mean"))
> aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> 
> List(count), col2 -> List(min, max, mean))
> scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) 
> => Seq(c).zipAll(fns, c, "") }
> aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), 
> (col2,max), (col2,mean))
> scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*)
> dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint 
> ... 3 more fields]
> scala> dfAgg.orderBy("col1").show
> +----+-----------+---------+---------+---------+
> |col1|count(col1)|min(col2)|max(col2)|avg(col2)|
> +----+-----------+---------+---------+---------+
> |   a|          4|        1|        4|      2.5|
> |   b|          2|       10|       20|     15.0|
> |   c|          1|      100|      100|    100.0|
> +----+-----------+---------+---------+---------+
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to