Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r184036079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } - // initialize and create data views - addReusableDataViews() + // get distinct filter of acc fields for each aggregate functions + val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + + val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { + val fieldIndex: Int = aggFields(idx)(0) --- End diff -- Add a safety check that `aggFields(idx).length == 1`
---