[ 
https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-16451:
----------------------------
    Description: 
When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime 
rows between 200 preceding and current row) as product, user from " + testTable
{code}
I got the follwing exception
{code:java}
//代码占位符

Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 
3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
java.util.ArrayList.get(ArrayList.java:433) at 
java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
{code}
But It worked with 
{code:java}
//代码占位符
select listagg(distinct product) over(partition by user order by proctime rows 
between 200 preceding and current row) as product, user from " + testTable
{code}
 

The exception will be throw  at the below code. 
{code:java}
//代码占位符
private def generateKeyExpression(
    ctx: CodeGeneratorContext,
    generator: ExprCodeGenerator): GeneratedExpression = {
  val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
    ctx,
    generator.input1Type,
    generator.input1Term,
    _,
    nullableInput = false,
    deepCopy = inputFieldCopy))
{code}
 

The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. It 
will be replaced by  '|' . And should not  generate Input Access for  index 3 

  was:
When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime 
rows between 200 preceding and current row) as product, user from " + testTable
{code}
I got the follwing exception
{code:java}
//代码占位符

Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 
3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
java.util.ArrayList.get(ArrayList.java:433) at 
java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
{code}
But It worked with 
{code:java}
//代码占位符
select listagg(distinct product) over(partition by user order by proctime rows 
between 200 preceding and current row) as product, user from " + testTable
{code}
 
{code:java}
//代码占位符
private def generateKeyExpression(
    ctx: CodeGeneratorContext,
    generator: ExprCodeGenerator): GeneratedExpression = {
  val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
    ctx,
    generator.input1Type,
    generator.input1Term,
    _,
    nullableInput = false,
    deepCopy = inputFieldCopy))
{code}
The exception will be throw  at the below code.

The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. It 
will be replaced by  '|' . And should not  generate Input Access for  index 3 


> listagg with distinct for over window  codegen error
> ----------------------------------------------------
>
>                 Key: FLINK-16451
>                 URL: https://issues.apache.org/jira/browse/FLINK-16451
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.2, 1.10.0
>            Reporter: jinfeng
>            Priority: Major
>
> When I use lisgagg with distinct and over window.
> {code:java}
> //代码占位符
> "select listagg(distinct product, '|') over(partition by user order by 
> proctime rows between 200 preceding and current row) as product, user from " 
> + testTable
> {code}
> I got the follwing exception
> {code:java}
> //代码占位符
> Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, 
> Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
> java.util.ArrayList.get(ArrayList.java:433) at 
> java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
> org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
>  at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
>  at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> {code}
> But It worked with 
> {code:java}
> //代码占位符
> select listagg(distinct product) over(partition by user order by proctime 
> rows between 200 preceding and current row) as product, user from " + 
> testTable
> {code}
>  
> The exception will be throw  at the below code. 
> {code:java}
> //代码占位符
> private def generateKeyExpression(
>     ctx: CodeGeneratorContext,
>     generator: ExprCodeGenerator): GeneratedExpression = {
>   val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
>     ctx,
>     generator.input1Type,
>     generator.input1Term,
>     _,
>     nullableInput = false,
>     deepCopy = inputFieldCopy))
> {code}
>  
> The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. 
> It will be replaced by  '|' . And should not  generate Input Access for  
> index 3 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to