Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r184075230
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
    @@ -481,14 +622,34 @@ class AggregationCodeGenerator(
                |    org.apache.flink.types.Row b)
                """.stripMargin
           val merge: String = {
    -        for (i <- aggs.indices) yield
    -          j"""
    -             |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
    -             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) 
b.getField(${mapping(i)});
    -             |    accIt$i.setElement(bAcc$i);
    -             |    ${aggs(i)}.merge(aAcc$i, accIt$i);
    -             |    a.setField($i, aAcc$i);
    -          """.stripMargin
    +        for (i <- aggs.indices) yield {
    +          if (isDistinctAggs(i)) {
    +            j"""
    +               |    $distinctAccType aDistinctAcc$i = ($distinctAccType) 
a.getField($i);
    +               |    $distinctAccType bDistinctAcc$i = ($distinctAccType) 
b.getField(${mapping(i)});
    +               |    java.util.Iterator<java.util.Map.Entry> mergeIt$i =
    +               |        bDistinctAcc$i.elements().iterator();
    +               |    while (mergeIt$i.hasNext()) {
    +               |      java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
    +               |      Object k = entry.getKey();
    +               |      Long v = (Long) entry.getValue();
    +               |      if (aDistinctAcc$i.add(k, v)) {
    +               |        ${accTypes(i)} aAcc$i = (${accTypes(i)}) 
aDistinctAcc$i.getRealAcc();
    --- End diff --
    
    Move this out of the while loop?


---

Reply via email to