Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135117112
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
       // 
----------------------------------------------------------------------------------------------
     
       /**
    +    * Analyze the constructor to get the type information of the MapView 
or ListView type variables
    +    * inside the accumulate.
    +    *
    +    * @param aggFun aggregate function
    +    * @return the data view specification
    +    */
    +  def getDataViewTypeInfoFromConstructor(
    +    aggFun: AggregateFunction[_, _])
    +  : mutable.HashMap[String, TypeInformation[_]] = {
    +
    +    val resultMap = new mutable.HashMap[String, TypeInformation[_]]
    +    val acc = aggFun.createAccumulator()
    +    val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
    +    for (i <- 0 until fields.size()) {
    +      val field = fields.get(i)
    +      field.setAccessible(true)
    +      if (classOf[DataView].isAssignableFrom(field.getType)) {
    +        if (field.getType == classOf[MapView[_, _]]) {
    +          val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +          if (mapView != null) {
    +            val keyTypeInfo = mapView.keyTypeInfo
    +            val valueTypeInfo = mapView.valueTypeInfo
    +
    +            if (keyTypeInfo != null && valueTypeInfo != null) {
    +              resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
    +            }
    +          } else {
    +            resultMap.put(field.getName, null)
    +          }
    +        } else if (field.getType == classOf[ListView[_]]) {
    +          val listView = field.get(acc).asInstanceOf[ListView[_]]
    +          val elementTypeInfo = listView.elementTypeInfo
    +
    +          if (elementTypeInfo != null) {
    +            resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
    +          }
    +        }
    +      }
    +    }
    +
    +    resultMap
    +  }
    +
    +  /**
    +    * Remove StateView fields from accumulator type information.
    +    *
    +    * @param index index of aggregate function
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information, only support pojo type
    +    * @param isStateBackedDataViews is data views use state backend
    +    * @return mapping of accumulator type information and data view config 
which contains id,
    +    *         field name and state descriptor
    +    */
    +  def removeStateViewFieldsFromAccTypeInfo(
    +    index: Int,
    +    aggFun: AggregateFunction[_, _],
    +    accType: TypeInformation[_],
    +    isStateBackedDataViews: Boolean)
    +  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
    +
    +    var hasDataView = false
    +    val acc = aggFun.createAccumulator()
    +    accType match {
    +      case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
    +        val arity = pojoType.getArity
    +        val newPojoFields = new util.ArrayList[PojoField]()
    +        val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
    +        for (i <- 0 until arity) {
    +          val pojoField = pojoType.getPojoFieldAt(i)
    +          val field = pojoField.getField
    +          val fieldName = field.getName
    +          field.setAccessible(true)
    +
    +          pojoField.getTypeInformation match {
    +            case map: MapViewTypeInfo[Any, Any] =>
    +              val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +              if (mapView != null) {
    +                val keyTypeInfo = mapView.keyTypeInfo
    +                val valueTypeInfo = mapView.valueTypeInfo
    +                val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
    +                  hasDataView = true
    +                  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
    +                } else {
    +                  map
    +                }
    +
    +                var spec = MapViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case list: ListViewTypeInfo[Any] =>
    +              val listView = field.get(acc).asInstanceOf[ListView[_]]
    +              if (listView != null) {
    +                val elementTypeInfo = listView.elementTypeInfo
    +                val newTypeInfo = if (elementTypeInfo != null) {
    +                  hasDataView = true
    +                  new ListViewTypeInfo(elementTypeInfo)
    +                } else {
    +                  list
    +                }
    +
    +                var spec = ListViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case _ => newPojoFields.add(pojoField)
    +          }
    +        }
    +        (new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
    +
    +      case _ => if (!hasDataView) {
    --- End diff --
    
    change to
    
    ```
    case _ if (!hasDataView) => (accType, None)
    case _ => throw new TableException(...)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to