Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Dongwon Kim
Hi Timo,

Okay, then the aggregate function should look like this:

>  public static class Agg extends AggregateFunction ArrayList> {
> @Override
> public ArrayList createAccumulator() {
>   return new ArrayList<>();
> }
> @Override
> public Integer[] getValue(ArrayList acc) {
>   return acc.toArray(new Integer[0]);
> }
> public void accumulate(ArrayList acc, int i) {
>   acc.add(i);
> }
> @Override
> public TypeInformation getResultType() {
>   return OBJECT_ARRAY(Types.INT);
> }
>   }


Now the program outputs:

> 2> +I([1, 2])


Thanks,

Dongwon

On Fri, Nov 27, 2020 at 5:38 PM Timo Walther  wrote:

> Hi,
>
> first of all we don't support ListTypeInfo in Table API. Therefore, it
> is treated as a RAW type. The exception during exception creation is a
> bug that should be fixed in future version. But the mismatch is valid:
>
> ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
> Can you try this as the result type of your aggregate function.
>
> Reagrds,
> Timo
>
>
> On 26.11.20 18:13, Dongwon Kim wrote:
> > Hello,
> >
> > I'm using Flink-1.11.2.
> >
> > Let's assume that I want to store on a table the result of the following
> > UDAF:
> >
> >public class Agg extends AggregateFunction,
> > List> {
> >  @Override
> >  public List createAccumulator() {
> >return new LinkedList<>();
> >  }
> >  @Override
> >  public List getValue(List acc) {
> >return acc;
> >  }
> >  public void accumulate(List acc, int i) {
> >acc.add(i);
> >  }
> >  @Override
> >  public TypeInformation> getResultType() {
> >return new ListTypeInfo<>(Integer.class);
> >  }
> >}
> >
> >
> > The main program looks as follow:
> >
> > public class TestMain {
> >public static void main(String[] args) {
> >  EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >.inBatchMode()
> >.build();
> >  TableEnvironment tEnv = TableEnvironment.create(settings);
> >  tEnv.executeSql(
> >"CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() +
> "'"
> >  );
> >  Table t = tEnv.sqlQuery(
> >"SELECT agg(c2)\n" +
> >  "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
> >  "GROUP BY c1"
> >  );
> >  tEnv.executeSql(
> >"CREATE TABLE output (a ARRAY) WITH ('connector' =
> 'print')"
> >  );
> >  /**
> >   * root
> >   *  |-- EXPR$0: RAW('java.util.List', ?)
> >   */
> >  t.printSchema();
> >  t.executeInsert("output" );
> >}
> > }
> >
> >
> > This program fails with the following exception:
> >
> > Exception in thread "main"
> > org.apache.flink.table.api.TableException: A raw type backed by type
> > information has no serializable string representation. It needs to
> > be resolved into a proper raw type.
> > at
> >
>  
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> > at scala.Option.map(Option.scala:146)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>  
> 

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther

Hi,

first of all we don't support ListTypeInfo in Table API. Therefore, it 
is treated as a RAW type. The exception during exception creation is a 
bug that should be fixed in future version. But the mismatch is valid:


ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. 
Can you try this as the result type of your aggregate function.


Reagrds,
Timo


On 26.11.20 18:13, Dongwon Kim wrote:

Hello,

I'm using Flink-1.11.2.

Let's assume that I want to store on a table the result of the following 
UDAF:


   public class Agg extends AggregateFunction,
List> {
     @Override
     public List createAccumulator() {
       return new LinkedList<>();
     }
     @Override
     public List getValue(List acc) {
       return acc;
     }
     public void accumulate(List acc, int i) {
       acc.add(i);
     }
     @Override
     public TypeInformation> getResultType() {
       return new ListTypeInfo<>(Integer.class);
     }
   }


The main program looks as follow:

public class TestMain {
   public static void main(String[] args) {
     EnvironmentSettings settings = EnvironmentSettings.newInstance()
       .inBatchMode()
       .build();
     TableEnvironment tEnv = TableEnvironment.create(settings);
     tEnv.executeSql(
       "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
     );
     Table t = tEnv.sqlQuery(
       "SELECT agg(c2)\n" +
         "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
         "GROUP BY c1"
     );
     tEnv.executeSql(
       "CREATE TABLE output (a ARRAY) WITH ('connector' = 'print')"
     );
     /**
      * root
      *  |-- EXPR$0: RAW('java.util.List', ?)
      */
     t.printSchema();
     t.executeInsert("output" );
   }
}


This program fails with the following exception:

Exception in thread "main"
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to
be resolved into a proper raw type.
at

org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at

org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at

org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at

org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
at

org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
at my.TestMain.main(TestMain.java:62)


I found that two 

queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-26 Thread Dongwon Kim
Hello,

I'm using Flink-1.11.2.

Let's assume that I want to store on a table the result of the following
UDAF:

>   public class Agg extends AggregateFunction, List>
> {
> @Override
> public List createAccumulator() {
>   return new LinkedList<>();
> }
> @Override
> public List getValue(List acc) {
>   return acc;
> }
> public void accumulate(List acc, int i) {
>   acc.add(i);
> }
> @Override
> public TypeInformation> getResultType() {
>   return new ListTypeInfo<>(Integer.class);
> }
>   }


The main program looks as follow:

> public class TestMain {
>   public static void main(String[] args) {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>   .inBatchMode()
>   .build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> tEnv.executeSql(
>   "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
> );
> Table t = tEnv.sqlQuery(
>   "SELECT agg(c2)\n" +
> "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
> "GROUP BY c1"
> );
> tEnv.executeSql(
>   "CREATE TABLE output (a ARRAY) WITH ('connector' = 'print')"
> );
> /**
>  * root
>  *  |-- EXPR$0: RAW('java.util.List', ?)
>  */
> t.printSchema();
> t.executeInsert("output" );
>   }
> }


This program fails with the following exception:

> Exception in thread "main" org.apache.flink.table.api.TableException: A
> raw type backed by type information has no serializable string
> representation. It needs to be resolved into a proper raw type.
> at
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> at my.TestMain.main(TestMain.java:62)


I found that two types do not match:
- queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
- sinkLogicalType : ROW<`a` ARRAY>

Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
Is there no way for UDAF to return java.lang.List and store it as ARRAY?

Thanks in advance,

Dongwon