[ 
https://issues.apache.org/jira/browse/FLINK-20405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-20405:
---------------------------------
    Summary: The LAG function in over window is not implemented correctly  
(was: The LAG function in over window is not implements correctly)

> The LAG function in over window is not implemented correctly
> ------------------------------------------------------------
>
>                 Key: FLINK-20405
>                 URL: https://issues.apache.org/jira/browse/FLINK-20405
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Leonard Xu
>            Priority: Major
>
> For LAG(input, offset, default) function in over window, it always return 
> current row's input no matter how the offset is set.
> After see the codegen code of the function, I think the implementation is not 
> correct and need to correct.
> {code:java}
> // the offset and default value is never used
> public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws 
> Exception {                                    constant$14 = ((int) 1);       
>      constant$14isNull = false;                                               
>  constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData) 
> str$13);            constant$15isNull = false;                        
> typeSerializer$19 = 
> (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) 
> references[0]));          }
> public void accumulate(org.apache.flink.table.data.RowData accInput) throws 
> Exception {                        
> org.apache.flink.table.data.binary.BinaryStringData field$21;            
> boolean isNull$21;            
> org.apache.flink.table.data.binary.BinaryStringData field$22;            
> isNull$21 = accInput.isNullAt(2);            field$21 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;            if 
> (!isNull$21) {              field$21 = 
> ((org.apache.flink.table.data.binary.BinaryStringData) 
> accInput.getString(2));            }            field$22 = field$21;          
>   if (!isNull$21) {              field$22 = 
> (org.apache.flink.table.data.binary.BinaryStringData) 
> (typeSerializer$19.copy(field$22));            }                              
>                   if (agg0_leadlag != field$22) {              agg0_leadlag = 
> ((org.apache.flink.table.data.binary.BinaryStringData) 
> typeSerializer$19.copy(field$22));            }                   ;           
>  agg0_leadlagIsNull = isNull$21;                                         }
> {code}
>  
> The question comes from user mail list[1]
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to