Awesome! Thanks a lot for the memory profile. Couple remarks:
a) I can see that there are about 378k keys and each of them sets a timer.
b) Based on the settings for DeduplicatePerKey you posted, you will keep
track of all keys of the last 30 minutes.
Unless you have much fewer keys, the behavior is to be expected. The
memory sizes for the timer maps do not look particularly high (~12Mb).
How much memory did you reserve for the task managers?*
-Max
*The image links give me a "504 error".
On 14.08.20 23:29, Catlyn Kong wrote:
Hi!
We're indeed using the rocksdb state backend, so that might be part of
the reason. Due to some security concerns, we might not be able to
provide the full heap dump since we have some custom code path. But
here's a screenshot from JProfiler:
Screen Shot 2020-08-14 at 9.10.07 AM.png
Looks like TimerHeapInternalTimer (initiated in InternalTimerServiceImpl
<https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>)
isn't getting garbage collected? As David has mentioned the pipeline
uses DeduplicatePerKey
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey> in
Beam 2.22, ProcessConnectionEventFn is a simple stateless DoFn that just
does some logging and emits the events. Is there any possibility that
the timer logic or the way it's used in the dedupe Pardo can cause this
leak?
Thanks,
Catlyn
On Tue, Aug 11, 2020 at 7:58 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Hi!
Looks like a potential leak, caused by your code or by Beam itself.
Would you be able to supply a heap dump from one of the task managers?
That would greatly help debugging this issue.
-Max
On 07.08.20 00:19, David Gogokhiya wrote:
> Hi,
>
> We recently started using Apache Beam version 2.20.0 running on
Flink
> version 1.9 deployed on kubernetes to process unbounded streams
of data.
> However, we noticed that the memory consumed by stateful Beam is
> steadily increasing over time with no drops no matter what the
current
> bandwidth is. We were wondering if this is expected and if not what
> would be the best way to resolve it.
>
>
> More Context
>
> We have the following pipeline that consumes messages from the
unbounded
> stream of data. Later we deduplicate the messages based on unique
> message id using the deduplicate function
>
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
> Since we are using Beam version 2.20.0, we copied the source code
of the
> deduplicate function
>
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
> version 2.22.0. After that we unmap the tuple, retrieve the
necessary
> data from message payload and dump the corresponding data into
the log.
>
>
> Pipeline:
>
>
> Flink configuration:
>
>
> As we mentioned before, we noticed that the memory usage of the
> jobmanager and taskmanager pod are steadily increasing with no
drops no
> matter what the current bandwidth is. We tried allocating more
memory
> but it seems like no matter how much memory we allocate it
eventually
> reaches its limit and then it tries to restart itself.
>
>
> Sincerely, David
>
>