Thanks for the clarification.
Here are a few potential options to address the issue, based
on the discussion so far:
1) Optionally skip cleanup timer for global window
(user-controlled via pipeline option)
2) Instead of setting a cleanup timer for every key, handle
all keys for a given window with a single timer. This would
be runner specific and depend on if/how a given
runner supports key enumeration. Flink's keyed state backend
supports enumerating keys for a namespace (Beam window) and
state tag. [1]
3) Set the cleanup timer only when there is actually state
associated with a key. This could be accomplished by
intercepting append and clear in BagUserStateHandler [2] and
adding/removing the timer appropriately.
4) See if TTL support in the runner can is applicable, for
Flink see [3]
[1]
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76
[2]
https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
Also +1 to what Jan said. Streaming pipelines can process
bounded PCollections on some paths, so the global window
will terminate for those paths. This is also true for the
direct runner tetsts where PCollections pretend to be
unbounded, but we then advance the watermark to +inf to
terminate the pipeline.
On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
It is not Dataflow specific, but I think Dataflow is
the only runner that currently implements
Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit
When a pipeline is drained, all windows (including
global windows) end, and the windows processed (i.e.
as if they were fixed windows that terminated).
Currently the easiest way to ensure that is to rely
on the end-of-window timers for the global window
(alternatives are possible, like issuing a full-state
scan when a pipeline is drained, but that would be
quite a bit more complicated). This is not
specifically the GC timer, but rather the
end-of-window timer that is needed.
I believe that right now we don't have a way of
deleting timers if there are no elements buffered for
a key (e.g. a key that received a few elements that
were processed in a trigger and then never received
any more elements). This might be part of the problem
- large numbers of empty keys with noop timers set.
It would be nice if there were a way to detect this
and at least remove the timers for those empty keys.
Reuven
On Wed, Aug 19, 2020 at 9:20 PM Thomas Weise
<t...@apache.org <mailto:t...@apache.org>> wrote:
On Wed, Aug 19, 2020 at 9:49 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
Skipping the cleanup timer for the global
window will break any sort of drain
functionality, which relies on having those
timers there. It's also necessary for bounded
inputs, for the same reason.
Can you say a bit more about why this will break
drain functionality and bounded inputs? Is this
Dataflow specific? Is it because the state would
be reused by a subsequent instance of the pipeline?
For Flink, the GC timers would be triggered by
the final watermark and that will be the end of
the streaming job. Launching the same pipeline
again will either be a cold start with no
previous state or a start from savepoint/checkpoint.
It sounds like for Dataflow there may be a need
for the user to influence the behavior while for
Flink the GC timers in a global window are not
required.
On Wed, Aug 19, 2020 at 10:31 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
On Wed, Aug 19, 2020 at 9:53 AM Steve Niemitz
<sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
for what it's worth, dataflow has the
same problem here as well. We've also
worked around it by (optionally)
disabling the cleanup timer in global
windows. But I agree, having drain then
be an unsafe operation is not great.
Dataflow does not require the timers to be in
memory though, so unless the numbers get very
large (to the point where you run out of disk
storage storing the timers), it will not
cause your pipelines to fail.
I think for batch it's less of an issue
since basically everything is in the
global window anyways, and batch
pipelines run for a fixed amount of time
on a fixed input source. For streaming
pipelines, it's much easier to run into
this.
On Wed, Aug 19, 2020 at 12:50 PM Reuven
Lax <re...@google.com
<mailto:re...@google.com>> wrote:
@OnWindowExpiration is a per-key
callback.
On Wed, Aug 19, 2020 at 9:48 AM Luke
Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
With the addition
of @OnWindowExpiration, a single
timer across keys optimization
would still make sense.
On Wed, Aug 19, 2020 at 8:51 AM
Thomas Weise <t...@apache.org
<mailto:t...@apache.org>> wrote:
https://issues.apache.org/jira/browse/BEAM-10760
I confirmed that skipping the
cleanup timers resolves the
state leak that we observe in
the pipeline that uses a
global window.
@Luke the GC is key
partitioned and relies on
StateInternals. That makes it
impractical to have a single
timer that performs cleanup
for multiple keys, at least
in a runner agnostic way.
I would like to take a look
if there is a need to have
the GC timer for a
global window to start with.
Since the pipeline
terminates, the
runner discards all state
anyways - at least in the
case of Flink.
Thomas
On Mon, Aug 17, 2020 at 9:46
AM Luke Cwik
<lc...@google.com
<mailto:lc...@google.com>> wrote:
For the cleanup timer.
On Mon, Aug 17, 2020 at
9:45 AM Luke Cwik
<lc...@google.com
<mailto:lc...@google.com>> wrote:
Replacing a timer for
each key with just
one timer for all
keys would make sense
for the global window.
On Sun, Aug 16, 2020
at 5:54 PM Thomas
Weise <t...@apache.org
<mailto:t...@apache.org>>
wrote:
Thanks Jan. We
observe a similar
issue with state
size growth in
global window
(with the
portable runner).
We don't see this
issue
with non-global
windows,
there does not
appear to be any
residual. I will
take a look at
skipping the
cleanup timers
for global
window and see if
that resolves the
issue. These
timers lead to
potentially
unbounded state
growth and don't
really serve a
purpose.
Thomas
On Sun, Aug 16,
2020 at 1:16 AM
Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
Hi Catlyn,
if you use
global window
to perform
the
deduplication, then
it should be
expected to
have as many
timers as
there are
unique keys +
one timer for
each key that
arrived
during the
last 30
minutes
(because
there is
timer set to
clear the
state in the
deduplication
function).
The reason
for that is
that Beam
creates timer
for window
garbage
collection
time to clear
state (see
[1]). If it
is global
window, then
each key will
have
associated
timer forever
(it might
open question
if it makes
sense in this
case, or if
Beam can do
any better).
As I wrote
before, it
would
probably help
to use two
deduplications in
two
successive
fixed windows
of length 30
minutes,
shifted by 15
minutes
(FixedWindows.of(30
minutes).withOffset(15
minutes)), so
that the two
windows
overlap and
catch
duplicates
that would
appear near
boundary of
the first window.
@Max, do you
think it
would be
possible to
schedule the
cleanup timer
only when
there is
actually data
in state for
given key?
The timer
would be
cleared on
call to
`clear()`,
but would
have to be
set on every
write. Or
would it make
sense not to
schedule the
cleanup timer
for global
window at all?
Jan
[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334
On 8/15/20
5:47 PM,
Catlyn Kong
wrote:
Hi!
Thanks for
the
explanation!
The
screenshot
actually
shows all
the new
instances
between
marking the
heap and
taking a
heap dump,
so sorry if
that's a
little
confusing.
Here's what
the full
heap looks like:
Screen Shot
2020-08-15
at 8.31.42
AM.png
Our input
stream has
roughly 50
messages per
second and
the pipeline
has been
running for
about 24
hours. Even
assuming all
the messages
are unique,
5.5 million
timers is
still very
surprising.
We're
allocating
11G for
taskmanager JVM
heap, but it
eventually
gets filled
up (after
couple days)
and the
cluster ends
up in a bad
state.
Here's a
screenshot
of the heap
size over
the past 24h:
Screen Shot
2020-08-15
at 8.41.48
AM.png
Could it be
that the
timers never
got clear
out or maybe
the pipeline
is creating
more
timer instances
than expected?
On Sat, Aug
15, 2020 at
4:07 AM
Maximilian
Michels
<m...@apache.org
<mailto:m...@apache.org>>
wrote:
Awesome!
Thanks a
lot for
the
memory
profile.
Couple
remarks:
a) I can
see that
there
are
about
378k
keys and
each of
them
sets a
timer.
b) Based
on the
settings
for
DeduplicatePerKey
you
posted,
you will
keep
track of
all keys
of the
last 30
minutes.
Unless
you have
much
fewer
keys,
the
behavior
is to be
expected. The
memory
sizes
for the
timer
maps do
not look
particularly
high
(~12Mb).
How much
memory
did you
reserve
for the
task
managers?*
-Max
*The
image
links
give me
a "504
error".
On
14.08.20
23:29,
Catlyn
Kong wrote:
> Hi!
>
> We're
indeed
using
the
rocksdb
state
backend,
so that
might be
part of
> the
reason.
Due to
some
security
concerns, we
might
not be
able to
>
provide
the full
heap
dump
since we
have
some
custom
code
path. But
> here's
a
screenshot
from
JProfiler:
> Screen
Shot
2020-08-14
at
9.10.07
AM.png
> Looks
like
TimerHeapInternalTimer
(initiated
in
InternalTimerServiceImpl
>
<https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>)
> isn't
getting
garbage
collected?
As David
has
mentioned the
pipeline
> uses
DeduplicatePerKey
>
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>
in
> Beam
2.22,
ProcessConnectionEventFn
is a
simple stateless
DoFn
that just
> does
some
logging
and
emits
the
events.
Is there
any
possibility
that
> the
timer
logic or
the way
it's
used in
the
dedupe
Pardo
can
cause this
> leak?
>
> Thanks,
> Catlyn
>
> On
Tue, Aug
11, 2020
at 7:58
AM
Maximilian
Michels
<m...@apache.org
<mailto:m...@apache.org>
>
<mailto:m...@apache.org
<mailto:m...@apache.org>>>
wrote:
>
> Hi!
>
>
Looks
like a
potential leak,
caused
by your
code or
by Beam
itself.
>
Would
you be
able to
supply a
heap
dump
from one
of the
task
managers?
>
That
would
greatly
help
debugging this
issue.
>
> -Max
>
> On
07.08.20
00:19,
David
Gogokhiya wrote:
> > Hi,
> >
> >
We
recently
started
using
Apache
Beam
version
2.20.0
running on
> Flink
> >
version
1.9
deployed
on
kubernetes
to
process
unbounded streams
> of
data.
> >
However,
we
noticed
that the
memory
consumed
by
stateful
Beam is
> >
steadily
increasing
over
time
with no
drops no
matter
what the
> current
> >
bandwidth is.
We were
wondering if
this is
expected
and if
not what
> >
would be
the best
way to
resolve it.
> >
> >
> >
More
Context
> >
> >
We have
the
following pipeline
that
consumes
messages
from the
> unbounded
> >
stream
of data.
Later we
deduplicate
the
messages
based on
unique
> >
message
id using
the
deduplicate
function
> >
>
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
>
> >
Since we
are
using
Beam
version
2.20.0,
we
copied
the
source code
> of the
> >
deduplicate
function
> >
>
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
>
> >
version
2.22.0.
After
that we
unmap
the
tuple,
retrieve the
> necessary
> >
data
from
message
payload
and dump
the
corresponding
data into
>
the log.
> >
> >
> >
Pipeline:
> >
> >
> >
Flink
configuration:
> >
> >
> >
As we
mentioned before,
we
noticed
that the
memory
usage of the
> >
jobmanager
and
taskmanager
pod are
steadily
increasing
with no
>
drops no
> >
matter
what the
current
bandwidth is.
We tried
allocating
more
> memory
> >
but it
seems
like no
matter
how much
memory
we
allocate it
>
eventually
> >
reaches
its
limit
and then
it tries
to
restart
itself.
> >
> >
> >
Sincerely,
David
> >
> >
>