Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r184036079
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
    @@ -151,8 +157,36 @@ class AggregationCodeGenerator(
           }
         }
     
    -    // initialize and create data views
    -    addReusableDataViews()
    +    // get distinct filter of acc fields for each aggregate functions
    +    val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
    +
    +    val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
    +      case (isDistinctAgg, idx) => if (isDistinctAgg) {
    +        val fieldIndex: Int = aggFields(idx)(0)
    --- End diff --
    
    Add a safety check that `aggFields(idx).length == 1`


---

Reply via email to