Wang Qilong created FLINK-36897:
-----------------------------------
Summary: Error in calling processElement for AbstractAsynchronous
StateStreamOperator
Key: FLINK-36897
URL: https://issues.apache.org/jira/browse/FLINK-36897
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 2.0.0
Reporter: Wang Qilong
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it
from the AbstractAsynchronous StateStreamOperator, there was an error in the
data passed into the element by the processElement of the AbstractAsynchronous
StateMapBundleOperator itself
The inheritance relationship between asynchronous synchronization and two
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to
support asynchronous running capability
Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
How to reproduce:
Run testOverloadedAccumulator in SQL/AggregateITCase.jva in [1]
[1] https://github.com/Au-Miner/flink/tree/FLINK-36882
--
This message was sent by Atlassian Jira
(v8.20.10#820010)