It roughly takes multiple days (~5 days) to reach the memory limit. It looks 
like Beam's last operator stops producing any events 
(https://pasteboard.co/JlLuG5T.png) once the taskmanager's memory usage hits 
its limit (https://pasteboard.co/JlLvok4s.png). After that Beam is being stuck 
in this degraded state not being able to produce any events. It's worth noting 
that regular cluster restart with keeping the previous state doesn't help. 
Immediately after the restart, taskmanager's memory usage goes back to it's 
before restart value. Beam still doesn't produce any events at this point. The 
only thing that helps is restarting the cluster with dropping the previously 
saved state. Only in this case, Beam starts functioning as expected.

I am still trying to understand whether infinitely growing taskmanager's memory 
usage is an expected behavior or not?  

On 2020/08/10 19:16:25, Udi Meiri <eh...@google.com> wrote: 
> HI David,
> I'm not familiar with Flink, but assuming there aren't any memory
> management issues in the runner or SDK, try reducing
> proceesing_time_duration (30 minutes currently) to 60 seconds and see how
> long it takes for memory usage to reach the limit. Could you also say how
> long it currently takes for memory to reach the limit?
> 
> On Thu, Aug 6, 2020 at 4:23 PM David Gogokhiya <david...@yelp.com> wrote:
> 
> > Hi,
> >
> > We recently started using Apache Beam version 2.20.0 running on Flink
> > version 1.9 deployed on kubernetes to process unbounded streams of data.
> > However, we noticed that the memory consumed by stateful Beam is steadily
> > increasing over time with no drops no matter what the current bandwidth is.
> > We were wondering if this is expected and if not what would be the best way
> > to resolve it.
> > More Context
> >
> > We have the following pipeline that consumes messages from the unbounded
> > stream of data. Later we deduplicate the messages based on unique message
> > id using the deduplicate function
> > <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
> > Since we are using Beam version 2.20.0, we copied the source code of the
> > deduplicate function
> > <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>
> > from version 2.22.0. After that we unmap the tuple, retrieve the necessary
> > data from message payload and dump the corresponding data into the log.
> >
> > Pipeline:
> >
> >
> > Flink configuration:
> >
> >
> > As we mentioned before, we noticed that the memory usage of the jobmanager
> > and taskmanager pod are steadily increasing with no drops no matter what
> > the current bandwidth is. We tried allocating more memory but it seems like
> > no matter how much memory we allocate it eventually reaches its limit and
> > then it tries to restart itself.
> >
> >
> > Sincerely, David
> >
> >
> 

Reply via email to