Hi all,

I'm trying to add mini-batch optimizations for Regular Join
(flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
in Blink planner. And there're some test cases that failed, such as
AggregateITCase.testGroupBySingleValue.

After debugging, I found the corresponding heap memory for
BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0]
to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.

However, my mini-batch code doesn't have any low-level operators with
MemorySegment. I only buffered some records (RowData) in a Map just like
AbstractMapBundleOperator did. Object reuse was also disabled by
env.getConfig.disableObjectReuse(). It looks like there's something wrong
when StreamOneInputProcessor.processInput changed the memory segments that
do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
debugging page with more information was attached.

I'm not familiar with org.apache.flink.core.memory.MemorySegment or
sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas
about why it happens or where to check next?

Thank you.

Smile 

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to