Hi David,

what's the state backend you use for Flink? The default probably would be FsStateBackend, which stores whole state in memory of TaskManager. That could explain the behavior you are seeing, as the deduplication has to store all seen keys in memory. I'm afraid that although the key is cleared after timeout, the key still leaves a record in the TaskManager's heap (this is the case for non-portable runner, there are some timers associated with each key set for the end of window - this _might_ need fixing, as it seems suboptimal). You can confirm this by taking heap dump of TaskManager and look for timer-related objects.

You can try different state backend - RocksDBStateBackend, which would store the data on disk. However, this state backend has known (memory related) issues when running on k8s, these issues were fixed in Flink 1.10.0, so you probably would have to use at least that version (or better 1.10.1). Another consideration of this is that storing state in RocksDB has performance implications (because data is located on disk).

Another (sort of hackish, but maybe actually useful) solution could be to apply the deduplication in two successive fixed overlapping windows (e.g. two 1 minute windows, shifted by 30 seconds), because when window expires, the timers should be cleared and that could limit the number of keys actually held in memory. There needs to be two deduplications, because events on boundary of the first window would not be deduplicated.

Hope this helps,

Jan

On 8/10/20 10:14 PM, David Gogokhiya wrote:
It roughly takes multiple days (~5 days) to reach the memory limit. It looks like Beam's last operator stops producing any events (image link <https://pasteboard.co/JlLuG5T.png>) once the taskmanager's memory usage hits its limit (image link <https://pasteboard.co/JlLvok4s.png>). After that the Beam is being stuck in this degraded state not being able to produce any events. It's worth noting that regular cluster restart with keeping the previous state doesn't help. Immediately after the restart, taskmanager's memory usage goes back to it's before restart value. Beam still doesn't produce any events at this point. The only thing that helps is restarting the cluster with dropping the previously saved state. Only in this case, Beam starts functioning as expected.

I am still trying to understand whether infinitely growing taskmanager's memory usage is an expected behavior or not?

Sincerely,
David

On Thu, Aug 6, 2020 at 3:19 PM David Gogokhiya <david...@yelp.com <mailto:david...@yelp.com>> wrote:

    Hi,
    We recently started using Apache Beam version 2.20.0 running on
    Flink version 1.9 deployed on kubernetes to process unbounded
    streams of data. However, we noticed that the memory consumed by
    stateful Beam is steadily increasing over time with no drops no
    matter what the current bandwidth is. We were wondering if this is
    expected and if not what would be the best way to resolve it.


          More Context

    We have the following pipeline that consumes messages from the
    unbounded stream of data. Later we deduplicate the messages based
    on unique message id using the deduplicate function
    
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
    Since we are using Beam version 2.20.0, we copied the source code
    of the deduplicate function
    
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
    version 2.22.0. After that we unmap the tuple, retrieve the
    necessary data from message payload and dump the corresponding
    data into the log.


    Pipeline:


    Flink configuration:


    As we mentioned before, we noticed that the memory usage of the
    jobmanager and taskmanager pod are steadily increasing with no
    drops no matter what the current bandwidth is. We tried allocating
    more memory but it seems like no matter how much memory we
    allocate it eventually reaches its limit and then it tries to
    restart itself.

    Sincerely, David


Reply via email to