[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140800#comment-16140800 ]
ASF GitHub Bot commented on FLINK-7206: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135117112 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,139 @@ object UserDefinedFunctionUtils { // ---------------------------------------------------------------------------------------------- /** + * Analyze the constructor to get the type information of the MapView or ListView type variables + * inside the accumulate. + * + * @param aggFun aggregate function + * @return the data view specification + */ + def getDataViewTypeInfoFromConstructor( + aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + + val resultMap = new mutable.HashMap[String, TypeInformation[_]] + val acc = aggFun.createAccumulator() + val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) + for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { + if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { + val keyTypeInfo = mapView.keyTypeInfo + val valueTypeInfo = mapView.valueTypeInfo + + if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)) + } + } else { + resultMap.put(field.getName, null) + } + } else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc).asInstanceOf[ListView[_]] + val elementTypeInfo = listView.elementTypeInfo + + if (elementTypeInfo != null) { + resultMap.put(field.getName, new ListViewTypeInfo(elementTypeInfo)) + } + } + } + } + + resultMap + } + + /** + * Remove StateView fields from accumulator type information. + * + * @param index index of aggregate function + * @param aggFun aggregate function + * @param accType accumulator type information, only support pojo type + * @param isStateBackedDataViews is data views use state backend + * @return mapping of accumulator type information and data view config which contains id, + * field name and state descriptor + */ + def removeStateViewFieldsFromAccTypeInfo( + index: Int, + aggFun: AggregateFunction[_, _], + accType: TypeInformation[_], + isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + + var hasDataView = false + val acc = aggFun.createAccumulator() + accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => + val arity = pojoType.getArity + val newPojoFields = new util.ArrayList[PojoField]() + val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] + for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { + case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { + val keyTypeInfo = mapView.keyTypeInfo + val valueTypeInfo = mapView.valueTypeInfo + val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) + } else { + map + } + + var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + + accumulatorSpecs += spec + if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) + } + } + + case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { + val elementTypeInfo = listView.elementTypeInfo + val newTypeInfo = if (elementTypeInfo != null) { + hasDataView = true + new ListViewTypeInfo(elementTypeInfo) + } else { + list + } + + var spec = ListViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + + accumulatorSpecs += spec + if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) + } + } + + case _ => newPojoFields.add(pojoField) + } + } + (new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs)) + + case _ => if (!hasDataView) { --- End diff -- change to ``` case _ if (!hasDataView) => (accType, None) case _ => throw new TableException(...) ``` > Implementation of DataView to support state access for UDAGG > ------------------------------------------------------------ > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kaibo Zhou > Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)