Hi Timo,

Okay, then the aggregate function should look like this:

>  public static class Agg extends AggregateFunction<Integer[],
> ArrayList<Integer>> {
>     @Override
>     public ArrayList<Integer> createAccumulator() {
>       return new ArrayList<>();
>     }
>     @Override
>     public Integer[] getValue(ArrayList<Integer> acc) {
>       return acc.toArray(new Integer[0]);
>     }
>     public void accumulate(ArrayList<Integer> acc, int i) {
>       acc.add(i);
>     }
>     @Override
>     public TypeInformation<Integer[]> getResultType() {
>       return OBJECT_ARRAY(Types.INT);
>     }
>   }


Now the program outputs:

> 2> +I([1, 2])


Thanks,

Dongwon

On Fri, Nov 27, 2020 at 5:38 PM Timo Walther <twal...@apache.org> wrote:

> Hi,
>
> first of all we don't support ListTypeInfo in Table API. Therefore, it
> is treated as a RAW type. The exception during exception creation is a
> bug that should be fixed in future version. But the mismatch is valid:
>
> ARRAY<INT> is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
> Can you try this as the result type of your aggregate function.
>
> Reagrds,
> Timo
>
>
> On 26.11.20 18:13, Dongwon Kim wrote:
> > Hello,
> >
> > I'm using Flink-1.11.2.
> >
> > Let's assume that I want to store on a table the result of the following
> > UDAF:
> >
> >        public class Agg extends AggregateFunction<List<Integer>,
> >     List<Integer>> {
> >          @Override
> >          public List<Integer> createAccumulator() {
> >            return new LinkedList<>();
> >          }
> >          @Override
> >          public List<Integer> getValue(List<Integer> acc) {
> >            return acc;
> >          }
> >          public void accumulate(List<Integer> acc, int i) {
> >            acc.add(i);
> >          }
> >          @Override
> >          public TypeInformation<List<Integer>> getResultType() {
> >            return new ListTypeInfo<>(Integer.class);
> >          }
> >        }
> >
> >
> > The main program looks as follow:
> >
> >     public class TestMain {
> >        public static void main(String[] args) {
> >          EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >            .inBatchMode()
> >            .build();
> >          TableEnvironment tEnv = TableEnvironment.create(settings);
> >          tEnv.executeSql(
> >            "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() +
> "'"
> >          );
> >          Table t = tEnv.sqlQuery(
> >            "SELECT agg(c2)\n" +
> >              "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
> >              "GROUP BY c1"
> >          );
> >          tEnv.executeSql(
> >            "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' =
> 'print')"
> >          );
> >          /**
> >           * root
> >           *  |-- EXPR$0: RAW('java.util.List', ?)
> >           */
> >          t.printSchema();
> >          t.executeInsert("output" );
> >        }
> >     }
> >
> >
> > This program fails with the following exception:
> >
> >     Exception in thread "main"
> >     org.apache.flink.table.api.TableException: A raw type backed by type
> >     information has no serializable string representation. It needs to
> >     be resolved into a proper raw type.
> >     at
> >
>  
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> >     at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> >     at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> >     at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >     at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >     at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> >     at scala.Option.map(Option.scala:146)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >     at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >     at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >     at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >     at
> >
>  
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> >     at
> >
>  
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> >     at
> >
>  
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> >     at
> >
>  
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> >     at my.TestMain.main(TestMain.java:62)
> >
> >
> > I found that two types do not match:
> > - queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
> > - sinkLogicalType : ROW<`a` ARRAY<INT>>
> >
> > Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
> > Is there no way for UDAF to return java.lang.List<T> and store it as
> ARRAY?
> >
> > Thanks in advance,
> >
> > Dongwon
>
>

Reply via email to