Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r130104980
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
       // 
----------------------------------------------------------------------------------------------
     
       /**
    +    * get data view type information from accumulator constructor.
    +    *
    +    * @param aggFun aggregate function
    +    * @return the data view specification
    +    */
    +  def getDataViewTypeInfoFromConstructor(
    +    aggFun: AggregateFunction[_, _])
    +  : mutable.HashMap[String, TypeInformation[_]] = {
    +
    +    val resultMap = new mutable.HashMap[String, TypeInformation[_]]
    +    val acc = aggFun.createAccumulator()
    +    val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
    +    for (i <- 0 until fields.size()) {
    +      val field = fields.get(i)
    +      field.setAccessible(true)
    +      if (classOf[DataView].isAssignableFrom(field.getType)) {
    +        if (field.getType == classOf[MapView[_, _]]) {
    +          val mapView = field.get(acc)
    +          val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
    +          val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
    +          if (keyTypeInfo != null && valueTypeInfo != null) {
    +              resultMap.put(field.getName, new MapViewTypeInfo(
    +                keyTypeInfo.asInstanceOf[TypeInformation[_]],
    +                valueTypeInfo.asInstanceOf[TypeInformation[_]]))
    +          }
    +        } else if (field.getType == classOf[ListView[_]]) {
    +          val listView = field.get(acc)
    +          val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
    +          if (elementTypeInfo != null) {
    +            resultMap.put(field.getName,
    +              new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
    +          }
    +        }
    +      }
    +    }
    +
    +    resultMap
    +  }
    +
    +
    +  /**
    +    * Extract data view specification.
    +    *
    +    * @param index aggregate function index
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information
    +    * @param dataViewTypes data view fields types
    +    * @param isUseState is use state
    +    * @return the data view specification
    +    */
    +  def extractDataViewTypeInfo(
    --- End diff --
    
    This method should get some inline comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to