twalthr commented on a change in pull request #13507:
URL: https://github.com/apache/flink/pull/13507#discussion_r497238937



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonAggregate.scala
##########
@@ -65,18 +72,69 @@ trait CommonPythonAggregate extends CommonPythonBase {
     * For streaming execution we extract the PythonFunctionInfo from 
AggregateInfo.
     */
   protected def extractPythonAggregateFunctionInfosFromAggregateInfo(
-      pythonAggregateInfo: AggregateInfo): PythonFunctionInfo = {
+      aggIndex: Int,
+      pythonAggregateInfo: AggregateInfo): (PythonFunctionInfo, 
Array[DataViewSpec]) = {
     pythonAggregateInfo.function match {
       case function: PythonFunction =>
-        new PythonFunctionInfo(
-          function,
-          pythonAggregateInfo.argIndexes.map(_.asInstanceOf[AnyRef]))
+        (
+          new PythonFunctionInfo(
+            function,
+            pythonAggregateInfo.argIndexes.map(_.asInstanceOf[AnyRef])),
+          extractDataViewSpecs(
+            aggIndex,
+            function.asInstanceOf[PythonAggregateFunction].getAccumulatorType)
+        )
       case _: Count1AggFunction =>
         // The count star will be treated specially in Python UDF worker
-        PythonFunctionInfo.DUMMY_PLACEHOLDER
+        (PythonFunctionInfo.DUMMY_PLACEHOLDER, Array())
       case _ =>
         throw new TableException(
           "Unsupported python aggregate function: " + 
pythonAggregateInfo.function)
     }
   }
+
+  protected def extractDataViewSpecs(
+      index: Int,
+      accType: TypeInformation[_]): Array[DataViewSpec] = {
+    if (!accType.isInstanceOf[CompositeType[_]]) {
+      return Array()
+    }
+
+    def includesDataView(ct: CompositeType[_]): Boolean = {
+      (0 until ct.getArity).exists(i =>
+        ct.getTypeAt(i) match {
+          case nestedCT: CompositeType[_] => includesDataView(nestedCT)
+          case t: TypeInformation[_] if t.getTypeClass == classOf[ListView[_]] 
=> true
+          case _ => false
+        }
+      )
+    }
+
+    if (includesDataView(accType.asInstanceOf[CompositeType[_]])) {
+      accType match {
+        case rowType: RowTypeInfo =>
+            (0 until rowType.getArity).flatMap(i => {
+              rowType.getFieldTypes()(i) match {
+                case ct: CompositeType[_] if includesDataView(ct) =>
+                  throw new TableException(
+                    "For Python AggregateFunction DataView only supported at 
first " +
+                      "level of accumulators of Row type.")
+                case listView: ListViewTypeInfo[_] =>

Review comment:
       Side comment: This class is deprecated. Please don't use it in new code 
anymore. Otherwise we will never be able to drop it any time soon.
   
   Same for `TypeConversions.fromLegacyInfoToDataType` ideally we should get 
rid of all these calls as quickly as possible. I'm trying to remove as many of 
these calls as possible but if new calls are added in PR this is an endless 
effort.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to