[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140794#comment-16140794 ]
ASF GitHub Bot commented on FLINK-7206: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135134452 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } + /** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ + def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + + for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + // define the DataView variables + val serializedData = AggregateUtil.serialize(desc) --- End diff -- Why do we need to serialize and deserialize state descriptors? Can't we just generate the code to instantiate them? IMO, that would be more straightforward and easier to debug. > Implementation of DataView to support state access for UDAGG > ------------------------------------------------------------ > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kaibo Zhou > Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)