Hi,

first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid:

ARRAY<INT> is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you try this as the result type of your aggregate function.

Reagrds,
Timo


On 26.11.20 18:13, Dongwon Kim wrote:
Hello,

I'm using Flink-1.11.2.

Let's assume that I want to store on a table the result of the following UDAF:

       public class Agg extends AggregateFunction<List<Integer>,
    List<Integer>> {
         @Override
         public List<Integer> createAccumulator() {
           return new LinkedList<>();
         }
         @Override
         public List<Integer> getValue(List<Integer> acc) {
           return acc;
         }
         public void accumulate(List<Integer> acc, int i) {
           acc.add(i);
         }
         @Override
         public TypeInformation<List<Integer>> getResultType() {
           return new ListTypeInfo<>(Integer.class);
         }
       }


The main program looks as follow:

    public class TestMain {
       public static void main(String[] args) {
         EnvironmentSettings settings = EnvironmentSettings.newInstance()
           .inBatchMode()
           .build();
         TableEnvironment tEnv = TableEnvironment.create(settings);
         tEnv.executeSql(
           "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
         );
         Table t = tEnv.sqlQuery(
           "SELECT agg(c2)\n" +
             "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
             "GROUP BY c1"
         );
         tEnv.executeSql(
           "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' = 'print')"
         );
         /**
          * root
          *  |-- EXPR$0: RAW('java.util.List', ?)
          */
         t.printSchema();
         t.executeInsert("output" );
       }
    }


This program fails with the following exception:

    Exception in thread "main"
    org.apache.flink.table.api.TableException: A raw type backed by type
    information has no serializable string representation. It needs to
    be resolved into a proper raw type.
    at
    
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
    at
    
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
    at
    
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at
    
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
    at scala.Option.map(Option.scala:146)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at
    
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
    at
    
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
    at
    
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
    at
    
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
    at
    
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
    at my.TestMain.main(TestMain.java:62)


I found that two types do not match:
- queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
- sinkLogicalType : ROW<`a` ARRAY<INT>>

Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
Is there no way for UDAF to return java.lang.List<T> and store it as ARRAY?

Thanks in advance,

Dongwon

Reply via email to