[ https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781704#comment-17781704 ]
dalongliu commented on FLINK-32426: ----------------------------------- Fixed in 1.17: a461e949cb3068b5609b0c8921647e0cbf5e5e9e > Fix adaptive local hash agg can't work when auxGrouping exist > ------------------------------------------------------------- > > Key: FLINK-32426 > URL: https://issues.apache.org/jira/browse/FLINK-32426 > Project: Flink > Issue Type: Bug > Affects Versions: 1.18.0, 1.17.1 > Reporter: dalongliu > Assignee: dalongliu > Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0 > > > For the following case, the field `a` is primary key, we select from > `AuxGroupingTable` and group by a, b. Since a is primary key, it also > guarantee the unique, so planner will extract b as auxGrouping field. > {code:java} > registerCollection( > "AuxGroupingTable", > data2, > type2, > "a, b, c, d, e", > nullablesOfData2, > FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build()) > checkResult( > "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b", > Seq( > row(1, 1, 1), > row(2, 3, 2), > row(3, 4, 3), > row(4, 10, 4), > row(5, 11, 5) > ) > ) {code} > > Due to the generated code doesn't get auxGrouping fields from input RowData > and then setting it to aggBuffer, the aggBuffer RowData loses some fields, > and it will throw an index Exception when get the field from it. As following: > {code:java} > Caused by: java.lang.AssertionError: index (1) should < 1 > at > org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127) > at > org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156) > at > org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source) > at LocalHashAggregateWithKeys$39.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at BatchExecCalc$10.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at SourceConversion$6.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)