Hi Chesnay Schepler, Thank you for your reply. I found the problem just now.
My code will modify the key got from KeySelector by updating its RowKind. Some key selectors such as BinaryRowDataKeySelector returns a copy of a key[1], but EmptyRowDataKeySelector always returns the same object[2]. The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE), the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records with an empty key got the wrong key. Should EmptyRowDataKeySelector also returns a copy of BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed because it may also be used by other records. Smile [1]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 [2]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 Chesnay Schepler wrote > This is a bit concerning. Could you re-run your test with enabled > assertions and/or modify BinaryRowData#assertIndexIsValid to always > throw an error if one of the 2 assertions is not met? > > On 5/11/2021 9:37 AM, Smile wrote: >> Hi all, >> >> I'm trying to add mini-batch optimizations for Regular Join >> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) >> in Blink planner. And there're some test cases that failed, such as >> AggregateITCase.testGroupBySingleValue. >> >> After debugging, I found the corresponding heap memory for >> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from >> [0,0,0,0,0,0,0,0] >> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. >> >> However, my mini-batch code doesn't have any low-level operators with >> MemorySegment. I only buffered some records (RowData) in a Map just like >> AbstractMapBundleOperator did. Object reuse was also disabled by >> env.getConfig.disableObjectReuse(). It looks like there's something wrong >> when StreamOneInputProcessor.processInput changed the memory segments >> that >> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The >> debugging page with more information was attached. >> >> I'm not familiar with org.apache.flink.core.memory.MemorySegment or >> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any >> ideas >> about why it happens or where to check next? >> >> Thank you. >> >> Smile >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/