Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r134137525
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
     
         fieldTerm
       }
    +
    +  /**
    +    * Adds a reusable class to the member area of the generated 
[[Function]].
    +    */
    +  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
    +    val field =
    +      s"""
    +         |transient ${clazz.getCanonicalName} $fieldTerm = null;
    +         |""".stripMargin
    +    reusableMemberStatements.add(field)
    +  }
    +
    +  /**
    +    * Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
    +    *
    +    * @param indices indices of aggregate functions.
    +    * @param ctxTerm field name of runtime context.
    +    * @param accConfig data view config which contains id, field and 
StateDescriptos.
    +    * @return statements to create [[MapView]] or [[ListView]].
    +    */
    +  def addReusableDataViewConfig(
    +      indices: Range,
    +      ctxTerm: String,
    +      accConfig: Option[DataViewConfig])
    +    : String = {
    +    if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
    +      val initDataViews = new StringBuilder
    +      val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
    +        .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
    +        .toMap[String, StateDescriptor[_, _]]
    +
    +      for (i <- indices) yield {
    +        for (spec <- accConfig.get.accSpecs(i)) yield {
    +          val dataViewField = spec.field
    +          val dataViewTypeTerm = dataViewField.getType.getCanonicalName
    +          val desc = descMapping.getOrElse(spec.id,
    +            throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
    +
    +          val serializedData = AggregateUtil.serialize(desc)
    +          val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
    +          val field =
    +            s"""
    +               |transient $dataViewTypeTerm $dataViewFieldTerm = null;
    +               |""".stripMargin
    +          reusableMemberStatements.add(field)
    +
    +          val descFieldTerm = s"${dataViewFieldTerm}_desc"
    +          val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
    +          val descDeserialize =
    +            s"""
    +               |    $descClassQualifier $descFieldTerm = 
($descClassQualifier)
    +               |      ${AggregateUtil.getClass.getName.stripSuffix("$")}
    +               |      .deserialize("$serializedData");
    +             """.stripMargin
    +
    +          val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
    +            s"""
    +               |    $descDeserialize
    +               |    $dataViewFieldTerm =
    +               |      
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
    --- End diff --
    
    I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to