Hi David,
what's the state backend you use for Flink? The default probably would
be FsStateBackend, which stores whole state in memory of TaskManager.
That could explain the behavior you are seeing, as the deduplication has
to store all seen keys in memory. I'm afraid that although the key is
cleared after timeout, the key still leaves a record in the
TaskManager's heap (this is the case for non-portable runner, there are
some timers associated with each key set for the end of window - this
_might_ need fixing, as it seems suboptimal). You can confirm this by
taking heap dump of TaskManager and look for timer-related objects.
You can try different state backend - RocksDBStateBackend, which would
store the data on disk. However, this state backend has known (memory
related) issues when running on k8s, these issues were fixed in Flink
1.10.0, so you probably would have to use at least that version (or
better 1.10.1). Another consideration of this is that storing state in
RocksDB has performance implications (because data is located on disk).
Another (sort of hackish, but maybe actually useful) solution could be
to apply the deduplication in two successive fixed overlapping windows
(e.g. two 1 minute windows, shifted by 30 seconds), because when window
expires, the timers should be cleared and that could limit the number of
keys actually held in memory. There needs to be two deduplications,
because events on boundary of the first window would not be deduplicated.
Hope this helps,
Jan
On 8/10/20 10:14 PM, David Gogokhiya wrote:
It roughly takes multiple days (~5 days) to reach the memory limit. It
looks like Beam's last operator stops producing any events (image link
<https://pasteboard.co/JlLuG5T.png>) once the taskmanager's memory
usage hits its limit (image link
<https://pasteboard.co/JlLvok4s.png>). After that the Beam is being
stuck in this degraded state not being able to produce any events.
It's worth noting that regular cluster restart with keeping the
previous state doesn't help. Immediately after the restart,
taskmanager's memory usage goes back to it's before restart value.
Beam still doesn't produce any events at this point. The only thing
that helps is restarting the cluster with dropping the previously
saved state. Only in this case, Beam starts functioning as expected.
I am still trying to understand whether infinitely growing
taskmanager's memory usage is an expected behavior or not?
Sincerely,
David
On Thu, Aug 6, 2020 at 3:19 PM David Gogokhiya <david...@yelp.com
<mailto:david...@yelp.com>> wrote:
Hi,
We recently started using Apache Beam version 2.20.0 running on
Flink version 1.9 deployed on kubernetes to process unbounded
streams of data. However, we noticed that the memory consumed by
stateful Beam is steadily increasing over time with no drops no
matter what the current bandwidth is. We were wondering if this is
expected and if not what would be the best way to resolve it.
More Context
We have the following pipeline that consumes messages from the
unbounded stream of data. Later we deduplicate the messages based
on unique message id using the deduplicate function
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
Since we are using Beam version 2.20.0, we copied the source code
of the deduplicate function
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
version 2.22.0. After that we unmap the tuple, retrieve the
necessary data from message payload and dump the corresponding
data into the log.
Pipeline:
Flink configuration:
As we mentioned before, we noticed that the memory usage of the
jobmanager and taskmanager pod are steadily increasing with no
drops no matter what the current bandwidth is. We tried allocating
more memory but it seems like no matter how much memory we
allocate it eventually reaches its limit and then it tries to
restart itself.
Sincerely, David