[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140800#comment-16140800
 ] 

ASF GitHub Bot commented on FLINK-7206:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135117112
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
       // 
----------------------------------------------------------------------------------------------
     
       /**
    +    * Analyze the constructor to get the type information of the MapView 
or ListView type variables
    +    * inside the accumulate.
    +    *
    +    * @param aggFun aggregate function
    +    * @return the data view specification
    +    */
    +  def getDataViewTypeInfoFromConstructor(
    +    aggFun: AggregateFunction[_, _])
    +  : mutable.HashMap[String, TypeInformation[_]] = {
    +
    +    val resultMap = new mutable.HashMap[String, TypeInformation[_]]
    +    val acc = aggFun.createAccumulator()
    +    val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
    +    for (i <- 0 until fields.size()) {
    +      val field = fields.get(i)
    +      field.setAccessible(true)
    +      if (classOf[DataView].isAssignableFrom(field.getType)) {
    +        if (field.getType == classOf[MapView[_, _]]) {
    +          val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +          if (mapView != null) {
    +            val keyTypeInfo = mapView.keyTypeInfo
    +            val valueTypeInfo = mapView.valueTypeInfo
    +
    +            if (keyTypeInfo != null && valueTypeInfo != null) {
    +              resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
    +            }
    +          } else {
    +            resultMap.put(field.getName, null)
    +          }
    +        } else if (field.getType == classOf[ListView[_]]) {
    +          val listView = field.get(acc).asInstanceOf[ListView[_]]
    +          val elementTypeInfo = listView.elementTypeInfo
    +
    +          if (elementTypeInfo != null) {
    +            resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
    +          }
    +        }
    +      }
    +    }
    +
    +    resultMap
    +  }
    +
    +  /**
    +    * Remove StateView fields from accumulator type information.
    +    *
    +    * @param index index of aggregate function
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information, only support pojo type
    +    * @param isStateBackedDataViews is data views use state backend
    +    * @return mapping of accumulator type information and data view config 
which contains id,
    +    *         field name and state descriptor
    +    */
    +  def removeStateViewFieldsFromAccTypeInfo(
    +    index: Int,
    +    aggFun: AggregateFunction[_, _],
    +    accType: TypeInformation[_],
    +    isStateBackedDataViews: Boolean)
    +  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
    +
    +    var hasDataView = false
    +    val acc = aggFun.createAccumulator()
    +    accType match {
    +      case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
    +        val arity = pojoType.getArity
    +        val newPojoFields = new util.ArrayList[PojoField]()
    +        val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
    +        for (i <- 0 until arity) {
    +          val pojoField = pojoType.getPojoFieldAt(i)
    +          val field = pojoField.getField
    +          val fieldName = field.getName
    +          field.setAccessible(true)
    +
    +          pojoField.getTypeInformation match {
    +            case map: MapViewTypeInfo[Any, Any] =>
    +              val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +              if (mapView != null) {
    +                val keyTypeInfo = mapView.keyTypeInfo
    +                val valueTypeInfo = mapView.valueTypeInfo
    +                val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
    +                  hasDataView = true
    +                  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
    +                } else {
    +                  map
    +                }
    +
    +                var spec = MapViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case list: ListViewTypeInfo[Any] =>
    +              val listView = field.get(acc).asInstanceOf[ListView[_]]
    +              if (listView != null) {
    +                val elementTypeInfo = listView.elementTypeInfo
    +                val newTypeInfo = if (elementTypeInfo != null) {
    +                  hasDataView = true
    +                  new ListViewTypeInfo(elementTypeInfo)
    +                } else {
    +                  list
    +                }
    +
    +                var spec = ListViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case _ => newPojoFields.add(pojoField)
    +          }
    +        }
    +        (new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
    +
    +      case _ => if (!hasDataView) {
    --- End diff --
    
    change to
    
    ```
    case _ if (!hasDataView) => (accType, None)
    case _ => throw new TableException(...)
    ```


> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
>                 Key: FLINK-7206
>                 URL: https://issues.apache.org/jira/browse/FLINK-7206
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to