Awesome! Thanks a lot for the memory profile. Couple remarks:

a) I can see that there are about 378k keys and each of them sets a timer.
b) Based on the settings for DeduplicatePerKey you posted, you will keep track of all keys of the last 30 minutes.

Unless you have much fewer keys, the behavior is to be expected. The memory sizes for the timer maps do not look particularly high (~12Mb).

How much memory did you reserve for the task managers?*

-Max

*The image links give me a "504 error".

On 14.08.20 23:29, Catlyn Kong wrote:
Hi!

We're indeed using the rocksdb state backend, so that might be part of the reason. Due to some security concerns, we might not be able to provide the full heap dump since we have some custom code path. But here's a screenshot from JProfiler:
Screen Shot 2020-08-14 at 9.10.07 AM.png
Looks like TimerHeapInternalTimer (initiated in InternalTimerServiceImpl <https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>) isn't getting garbage collected? As David has mentioned the pipeline uses DeduplicatePerKey <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey> in Beam 2.22, ProcessConnectionEventFn is a simple stateless DoFn that just does some logging and emits the events. Is there any possibility that the timer logic or the way it's used in the dedupe Pardo can cause this leak?

Thanks,
Catlyn

On Tue, Aug 11, 2020 at 7:58 AM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    Hi!

    Looks like a potential leak, caused by your code or by Beam itself.
    Would you be able to supply a heap dump from one of the task managers?
    That would greatly help debugging this issue.

    -Max

    On 07.08.20 00:19, David Gogokhiya wrote:
     > Hi,
     >
     > We recently started using Apache Beam version 2.20.0 running on
    Flink
     > version 1.9 deployed on kubernetes to process unbounded streams
    of data.
     > However, we noticed that the memory consumed by stateful Beam is
     > steadily increasing over time with no drops no matter what the
    current
     > bandwidth is. We were wondering if this is expected and if not what
     > would be the best way to resolve it.
     >
     >
     >       More Context
     >
     > We have the following pipeline that consumes messages from the
    unbounded
     > stream of data. Later we deduplicate the messages based on unique
     > message id using the deduplicate function
     >
    
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.

     > Since we are using Beam version 2.20.0, we copied the source code
    of the
     > deduplicate function
     >
    
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from

     > version 2.22.0. After that we unmap the tuple, retrieve the
    necessary
     > data from message payload and dump the corresponding data into
    the log.
     >
     >
     > Pipeline:
     >
     >
     > Flink configuration:
     >
     >
     > As we mentioned before, we noticed that the memory usage of the
     > jobmanager and taskmanager pod are steadily increasing with no
    drops no
     > matter what the current bandwidth is. We tried allocating more
    memory
     > but it seems like no matter how much memory we allocate it
    eventually
     > reaches its limit and then it tries to restart itself.
     >
     >
     > Sincerely, David
     >
     >

Reply via email to