queryLogicalType != sinkLogicalType when UDAF returns List
Hello, I'm using Flink-1.11.2. Let's assume that I want to store on a table the result of the following UDAF: > public class Agg extends AggregateFunction, List> > { > @Override > public List createAccumulator() { > return new LinkedList<>(); > } > @Override > public List getValue(List acc) { > return acc; > } > public void accumulate(List acc, int i) { > acc.add(i); > } > @Override > public TypeInformation> getResultType() { > return new ListTypeInfo<>(Integer.class); > } > } The main program looks as follow: > public class TestMain { > public static void main(String[] args) { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .inBatchMode() > .build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > tEnv.executeSql( > "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'" > ); > Table t = tEnv.sqlQuery( > "SELECT agg(c2)\n" + > "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" + > "GROUP BY c1" > ); > tEnv.executeSql( > "CREATE TABLE output (a ARRAY) WITH ('connector' = 'print')" > ); > /** > * root > * |-- EXPR$0: RAW('java.util.List', ?) > */ > t.printSchema(); > t.executeInsert("output" ); > } > } This program fails with the following exception: > Exception in thread "main" org.apache.flink.table.api.TableException: A > raw type backed by type information has no serializable string > representation. It needs to be resolved into a proper raw type. > at > org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) > at > org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) > at my.TestMain.main(TestMain.java:62) I found that two types do not match: - queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)> - sinkLogicalType : ROW<`a` ARRAY> Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'? Is there no way for UDAF to return java.lang.List and store it as ARRAY? Thanks in advance, Dongwon
Re: queryLogicalType != sinkLogicalType when UDAF returns List
Hi, first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid: ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you try this as the result type of your aggregate function. Reagrds, Timo On 26.11.20 18:13, Dongwon Kim wrote: Hello, I'm using Flink-1.11.2. Let's assume that I want to store on a table the result of the following UDAF: public class Agg extends AggregateFunction, List> { @Override public List createAccumulator() { return new LinkedList<>(); } @Override public List getValue(List acc) { return acc; } public void accumulate(List acc, int i) { acc.add(i); } @Override public TypeInformation> getResultType() { return new ListTypeInfo<>(Integer.class); } } The main program looks as follow: public class TestMain { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql( "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'" ); Table t = tEnv.sqlQuery( "SELECT agg(c2)\n" + "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" + "GROUP BY c1" ); tEnv.executeSql( "CREATE TABLE output (a ARRAY) WITH ('connector' = 'print')" ); /** * root * |-- EXPR$0: RAW('java.util.List', ?) */ t.printSchema(); t.executeInsert("output" ); } } This program fails with the following exception: Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) at my.TestMain.main(TestMain.java:62) I found that two type
Re: queryLogicalType != sinkLogicalType when UDAF returns List
Hi Timo, Okay, then the aggregate function should look like this: > public static class Agg extends AggregateFunction ArrayList> { > @Override > public ArrayList createAccumulator() { > return new ArrayList<>(); > } > @Override > public Integer[] getValue(ArrayList acc) { > return acc.toArray(new Integer[0]); > } > public void accumulate(ArrayList acc, int i) { > acc.add(i); > } > @Override > public TypeInformation getResultType() { > return OBJECT_ARRAY(Types.INT); > } > } Now the program outputs: > 2> +I([1, 2]) Thanks, Dongwon On Fri, Nov 27, 2020 at 5:38 PM Timo Walther wrote: > Hi, > > first of all we don't support ListTypeInfo in Table API. Therefore, it > is treated as a RAW type. The exception during exception creation is a > bug that should be fixed in future version. But the mismatch is valid: > > ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. > Can you try this as the result type of your aggregate function. > > Reagrds, > Timo > > > On 26.11.20 18:13, Dongwon Kim wrote: > > Hello, > > > > I'm using Flink-1.11.2. > > > > Let's assume that I want to store on a table the result of the following > > UDAF: > > > >public class Agg extends AggregateFunction, > > List> { > > @Override > > public List createAccumulator() { > >return new LinkedList<>(); > > } > > @Override > > public List getValue(List acc) { > >return acc; > > } > > public void accumulate(List acc, int i) { > >acc.add(i); > > } > > @Override > > public TypeInformation> getResultType() { > >return new ListTypeInfo<>(Integer.class); > > } > >} > > > > > > The main program looks as follow: > > > > public class TestMain { > >public static void main(String[] args) { > > EnvironmentSettings settings = EnvironmentSettings.newInstance() > >.inBatchMode() > >.build(); > > TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.executeSql( > >"CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + > "'" > > ); > > Table t = tEnv.sqlQuery( > >"SELECT agg(c2)\n" + > > "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" + > > "GROUP BY c1" > > ); > > tEnv.executeSql( > >"CREATE TABLE output (a ARRAY) WITH ('connector' = > 'print')" > > ); > > /** > > * root > > * |-- EXPR$0: RAW('java.util.List', ?) > > */ > > t.printSchema(); > > t.executeInsert("output" ); > >} > > } > > > > > > This program fails with the following exception: > > > > Exception in thread "main" > > org.apache.flink.table.api.TableException: A raw type backed by type > > information has no serializable string representation. It needs to > > be resolved into a proper raw type. > > at > > > > org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) > > at > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > > at > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > > at > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) > > at > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) > > at > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > > at scala.Option.map(Option.scala:146) > > at > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > > at > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > scala.col