[
https://issues.apache.org/jira/browse/FLINK-36897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wang Qilong updated FLINK-36897:
--------------------------------
Description:
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it
from the AbstractAsynchronous StateStreamOperator, there was an error in the
data passed into the element by the processElement of the AbstractAsynchronous
StateMapBundleOperator itself
The inheritance relationship between asynchronous synchronization and two
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to
support asynchronous running capability
Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
How to reproduce:
Run testOverloadedAccumulator in sql/AggregateITCase.java in [1]
[1] [https://github.com/Au-Miner/flink/tree/FLINK-36882]
was:
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it
from the AbstractAsynchronous StateStreamOperator, there was an error in the
data passed into the element by the processElement of the AbstractAsynchronous
StateMapBundleOperator itself
The inheritance relationship between asynchronous synchronization and two
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to
support asynchronous running capability
Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
How to reproduce:
Run testOverloadedAccumulator in SQL/AggregateITCase.jva in [1]
[1] https://github.com/Au-Miner/flink/tree/FLINK-36882
> Error executing processElement when inheriting from
> AbstractAsyncStateStreamOperator
> ------------------------------------------------------------------------------------
>
> Key: FLINK-36897
> URL: https://issues.apache.org/jira/browse/FLINK-36897
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.0.0
> Reporter: Wang Qilong
> Priority: Major
>
> When I created the AbstractAsynchronous StateMapBundleOperator and inherited
> it from the AbstractAsynchronous StateStreamOperator, there was an error in
> the data passed into the element by the processElement of the
> AbstractAsynchronous StateMapBundleOperator itself
> The inheritance relationship between asynchronous synchronization and two
> classes is:
> AbstractMapBundleOperator->AbstractStreamOperator
> AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
> The reason for creating this class is to enable KeyedMapBundleOperator to
> support asynchronous running capability
> Example of incorrect information: For example, the original data format was:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("y", 1L))
> data.+=(("y", 2L))
> data.+=(("z", 3L))
> So the result of data transmission becomes:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> How to reproduce:
> Run testOverloadedAccumulator in sql/AggregateITCase.java in [1]
>
> [1] [https://github.com/Au-Miner/flink/tree/FLINK-36882]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)