[ 
https://issues.apache.org/jira/browse/FLINK-1001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14053832#comment-14053832
 ] 

Stephan Ewen commented on FLINK-1001:
-------------------------------------

Actually, though I think the aggregators are susceptible to mutable object 
bugs, the problem is in your code here:

The reduce function you write is a "minBy(0)", while the aggregation is 
"min(0)". The first one selects the tuple where field 0 is smallest, the second 
one picks for field 0 the minimum value (like in SQL). The same way as in SQL, 
other non-grouped and non-aggregated fields are non deterministic.

> Aggregate Min/Max return unexpected values.
> -------------------------------------------
>
>                 Key: FLINK-1001
>                 URL: https://issues.apache.org/jira/browse/FLINK-1001
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Bastian Köcher
>         Attachments: src.zip
>
>
> I wanted to replace my simple MinReducer
>       public static class MinSimilarityReducer extends 
> ReduceFunction<ClusterPair> {
>               @Override
>               public ClusterPair reduce(ClusterPair value1, ClusterPair 
> value2) throws Exception {
>                       if (value1.getSimilarity().doubleValue() < 
> value2.getSimilarity().doubleValue()) {
>                               return value1;
>                       }
>                       return value2;
>               }
>       }
> With Aggregate.Min, but Aggregate.Min delivers a value which should already 
> be removed in my WorkSet.
> If I use my own Reducer it works as expected, but with an Aggregate this 
> doesn't work anymore :/ 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to