[ 
https://issues.apache.org/jira/browse/SPARK-17758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15543947#comment-15543947
 ] 

Herman van Hovell commented on SPARK-17758:
-------------------------------------------

You could write your own UDAF.

Partitions can be come empty for various reasons:
- Data Skew
- A filter applied to the partition can make it empty. Spark pipelines 
operators, so a filter could effectively prune the partition but we would still 
aggregate it.

> Spark Aggregate function  LAST returns null on an empty partition 
> ------------------------------------------------------------------
>
>                 Key: SPARK-17758
>                 URL: https://issues.apache.org/jira/browse/SPARK-17758
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.0.0
>         Environment: Spark 2.0.0
>            Reporter: Franck Tago
>
> My Environment 
> Spark 2.0.0  
> I have included the physical plan of my application below.
> Issue description
> The result from  a query that uses the LAST function are incorrect. 
> The output obtained for the column that corresponds to the last function is 
> null .  
> My input data contain 3 rows . 
> The application resulted in  2 stages 
> The first stage consisted of 3 tasks . 
> The first task/partition contains 2 rows
> The second task/partition contains 1 row
> The last task/partition contain  0 rows
> The result from the query executed for the LAST column call is NULL which I 
> believe is due to the  PARTIAL_LAST on the last partition . 
> I believe that this behavior is incorrect. The PARTIAL_LAST call on an empty 
> partition should not return null .
> {noformat}
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation default, bdm_3449_tgt20, true, false
> +- *Project [last(C3_1)#51 AS field#102, cast(round(max(C3_0)#50, 0) as int) 
> AS field1#103, cast(round(max(C3_0)#50, 0) as int) AS field2#104]
>    +- SortAggregate(key=[], functions=[max(C3_0#40),last(C3_1#41, false)], 
> output=[max(C3_0)#50,last(C3_1)#51])
>       +- SortAggregate(key=[], 
> functions=[partial_max(C3_0#40),partial_last(C3_1#41, false)], 
> output=[max#91,last#92])
>          +- *Project [CAST(sum(C1_0) AS DOUBLE)#27 AS C3_0#40, last(C1_1)#28 
> AS C3_1#41]
>             +- SortAggregate(key=[], functions=[sum(cast(C1_0#17 as 
> bigint)),last(C1_1#18, false)], output=[CAST(sum(C1_0) AS 
> DOUBLE)#27,last(C1_1)#28])
>                +- Exchange SinglePartition
>                   +- SortAggregate(key=[], 
> functions=[partial_sum(cast(C1_0#17 as bigint)),partial_last(C1_1#18, 
> false)], output=[sum#95L,last#96])
>                      +- *Project [field1#7 AS C1_0#17, field#6 AS C1_1#18]
>                         +- HiveTableScan [field1#7, field#6], 
> MetastoreRelation default, bdm_3449_src, alias
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to