[jira] [Commented] (FLINK-32426) Fix adaptive local hash agg can't work when auxGrouping exist
[ https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738543#comment-17738543 ] Jark Wu commented on FLINK-32426: - [~lsy], do we need to fix it in 1.17.2 as well? If yes, could you help to create a PR for release-1.17 branch? > Fix adaptive local hash agg can't work when auxGrouping exist > - > > Key: FLINK-32426 > URL: https://issues.apache.org/jira/browse/FLINK-32426 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0, 1.17.1 >Reporter: dalongliu >Assignee: dalongliu >Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0 > > > For the following case, the field `a` is primary key, we select from > `AuxGroupingTable` and group by a, b. Since a is primary key, it also > guarantee the unique, so planner will extract b as auxGrouping field. > {code:java} > registerCollection( > "AuxGroupingTable", > data2, > type2, > "a, b, c, d, e", > nullablesOfData2, > FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build()) > checkResult( > "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b", > Seq( > row(1, 1, 1), > row(2, 3, 2), > row(3, 4, 3), > row(4, 10, 4), > row(5, 11, 5) > ) > ) {code} > > Due to the generated code doesn't get auxGrouping fields from input RowData > and then setting it to aggBuffer, the aggBuffer RowData loses some fields, > and it will throw an index Exception when get the field from it. As following: > {code:java} > Caused by: java.lang.AssertionError: index (1) should < 1 > at > org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127) > at > org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156) > at > org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source) > at LocalHashAggregateWithKeys$39.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at BatchExecCalc$10.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at SourceConversion$6.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at > org.apache.flink.
[jira] [Commented] (FLINK-32426) Fix adaptive local hash agg can't work when auxGrouping exist
[ https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781704#comment-17781704 ] dalongliu commented on FLINK-32426: --- Fixed in 1.17: a461e949cb3068b5609b0c8921647e0cbf5e5e9e > Fix adaptive local hash agg can't work when auxGrouping exist > - > > Key: FLINK-32426 > URL: https://issues.apache.org/jira/browse/FLINK-32426 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0, 1.17.1 >Reporter: dalongliu >Assignee: dalongliu >Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0 > > > For the following case, the field `a` is primary key, we select from > `AuxGroupingTable` and group by a, b. Since a is primary key, it also > guarantee the unique, so planner will extract b as auxGrouping field. > {code:java} > registerCollection( > "AuxGroupingTable", > data2, > type2, > "a, b, c, d, e", > nullablesOfData2, > FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build()) > checkResult( > "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b", > Seq( > row(1, 1, 1), > row(2, 3, 2), > row(3, 4, 3), > row(4, 10, 4), > row(5, 11, 5) > ) > ) {code} > > Due to the generated code doesn't get auxGrouping fields from input RowData > and then setting it to aggBuffer, the aggBuffer RowData loses some fields, > and it will throw an index Exception when get the field from it. As following: > {code:java} > Caused by: java.lang.AssertionError: index (1) should < 1 > at > org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127) > at > org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156) > at > org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source) > at LocalHashAggregateWithKeys$39.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at BatchExecCalc$10.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at SourceConversion$6.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWa