Github user huawei-flink commented on the issue:
https://github.com/apache/flink/pull/3574
Hi @fhueske, @sunjincheng121 ,
let me try to explain my perspective on this specific case (row based, proc
time). This is for the purpose of discussion, to show that we are spending
thoughts on this topic for a while now.
In case of the row range, the "serialization savings" coming from MapState
exists up to the point in which the "buffer" is filled. After that that, we
need to start retracting to keep the value correct and to do that, we need to
deserialize all the objects. as @rtudoran mentioned, we implemented a version
using a Queue object.
This has many advantages:
- removing the object from the buffer at the right moment freeing memory on
the go (without any iteration over the key set)
- has the data access pattern of O(1) without any "key resolution costs"
and no list iteration
- keeps the natural processing order by design, without the need of
indexing objects with timestamps
- the experiments we run show that there are no difference for windows up
to 100k elements, and after that the queue seems to be more efficient (as the
the key resolution does not come for free).
The map state may have a slight advantage in the early stages, when the
window is not filled, but after it just introduces useless operations.
Furthermore, the need to index objects with a created timestamp (more memory
wasted), dealing with a sequential access (List) to get the most recent object
when you can actually just use the natural arrival order seems useless
complication. Applying the Occam Razor there should be no doubt on which
solution we should be selecting first. The serialization optimization while
window gets filled sounds like a premature optimization not worth in the long
run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc) can
just benefit from the fact that the state is already sorted, whereas the map
would need to be sorted all the time.
Of course I am talking specifically of the procTime semantic operations.
eventTime is another story anyway. The map state as minor advantages in the
beginning (as anyway the serialization costs are small), the queue state as
advantages in executions running steadily because of access pattern and natural
buffer cleansing.
These are my two cents on the discussion
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---