Hi Chesnay Schepler, 

Thank you for your reply.
I found the problem just now.

My code will modify the key got from KeySelector by updating its RowKind. 
Some key selectors such as BinaryRowDataKeySelector returns a copy of a
key[1], but EmptyRowDataKeySelector always returns the same object[2]. 

The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT
* FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global
join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE),
the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records
with an empty key got the wrong key.

Should EmptyRowDataKeySelector also returns a copy of
BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed
because it may also be used by other records.

Smile

[1].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49
[2].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36



Chesnay Schepler wrote
> This is a bit concerning. Could you re-run your test with enabled 
> assertions and/or modify BinaryRowData#assertIndexIsValid to always 
> throw an error if one of the 2 assertions is not met?
> 
> On 5/11/2021 9:37 AM, Smile wrote:
>> Hi all,
>>
>> I'm trying to add mini-batch optimizations for Regular Join
>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
>> in Blink planner. And there're some test cases that failed, such as
>> AggregateITCase.testGroupBySingleValue.
>>
>> After debugging, I found the corresponding heap memory for
>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from
>> [0,0,0,0,0,0,0,0]
>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.
>>
>> However, my mini-batch code doesn't have any low-level operators with
>> MemorySegment. I only buffered some records (RowData) in a Map just like
>> AbstractMapBundleOperator did. Object reuse was also disabled by
>> env.getConfig.disableObjectReuse(). It looks like there's something wrong
>> when StreamOneInputProcessor.processInput changed the memory segments
>> that
>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
>> debugging page with more information was attached.
>>
>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any
>> ideas
>> about why it happens or where to check next?
>>
>> Thank you.
>>
>> Smile
>>
>> &lt;http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png&gt;
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to