[ https://issues.apache.org/jira/browse/FLINK-20405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240461#comment-17240461 ]
Benchao Li commented on FLINK-20405: ------------------------------------ This is duplicated with FLINK-19449 ? > The LAG function in over window is not implemented correctly > ------------------------------------------------------------ > > Key: FLINK-20405 > URL: https://issues.apache.org/jira/browse/FLINK-20405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Leonard Xu > Priority: Major > > For LAG(input, offset, default) function in over window, it always return > current row's input no matter how the offset is set. > After see the codegen code of the function, I think the implementation is not > correct and need to correct. > {code:java} > // the offset and default value is never used > public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws > Exception { constant$14 = ((int) 1); > constant$14isNull = false; > constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData) > str$13); constant$15isNull = false; > typeSerializer$19 = > (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) > references[0])); } > public void accumulate(org.apache.flink.table.data.RowData accInput) throws > Exception { > org.apache.flink.table.data.binary.BinaryStringData field$21; > boolean isNull$21; > org.apache.flink.table.data.binary.BinaryStringData field$22; > isNull$21 = accInput.isNullAt(2); field$21 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if > (!isNull$21) { field$21 = > ((org.apache.flink.table.data.binary.BinaryStringData) > accInput.getString(2)); } field$22 = field$21; > if (!isNull$21) { field$22 = > (org.apache.flink.table.data.binary.BinaryStringData) > (typeSerializer$19.copy(field$22)); } > if (agg0_leadlag != field$22) { agg0_leadlag = > ((org.apache.flink.table.data.binary.BinaryStringData) > typeSerializer$19.copy(field$22)); } ; > agg0_leadlagIsNull = isNull$21; } > {code} > > The question comes from user mail list[1] > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html -- This message was sent by Atlassian Jira (v8.3.4#803005)