Hi Daniel,
I did some more debugging. I think the fix we proposed only cures the symptoms.
The cause is that your job uses Jackson which is also a dependency of Flink.
So your job ends up using Flink's version of Jackson which then installs classes
from your job in the Jackson cache. Now, this wouldn't be a problem, if you
shaded your version of Jackson, i.e. renamed the Jackson package.
But even without shading, the default behavior of Flink is to load user classes
first. Could you please check:
1) Is "classloader.resolve-order" set to "child-first"?
2) Do you include the Jackson library in your user jar?
Thanks,
Max
On 18.01.19 03:40, Daniel Harper wrote:
Thanks for raising this and the PR!
In our production streaming job we’re using Kinesis, so good shout on the
UnboundedSupportWrapper.
On 17/01/2019, 21:08, "Maximilian Michels" <m...@apache.org> wrote:
I'm glad that solved your GC problem. I think dipose() is a good place,
it is
meant for cleanup.
In your case the DoFn is a NOOP, so the PipelineOptions are probably
loaded
through your UnboundedSource. If both happen to be scheduled in the same
TaskManager that is fine. However, just for precaution we should also
include
the cache invalidation in UnboundedSourceWrapper.
This way we should be good for the streaming execution. Will try to get
this
into 2.10.0.
Thanks,
Max
Issue: https://jira.apache.org/jira/browse/BEAM-6460
On 17.01.19 12:50, Daniel Harper wrote:
Max, Juan,
Just tried patching this class
https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/or
g/
apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.jav
a#
L389 and putting the clearCache call in the finally block.
Redoing the test causes the GC to kick in (see screenshot)
I¹m not sure if this is the best place to put this clean up code though,
is this the final place where all BEAM related stuff get terminated?
Daniel.
On 17/01/2019, 16:18, "Maximilian Michels" <m...@apache.org> wrote:
Hi Daniel, hi Juan,
@Daniel Thanks a lot for investigating and reporting the issue.
Your analysis looks convincing, it may be that Jackson is holding on to
the
Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
Have you already tried to call
TypeFactory.defaultInstance().clearCache()
in a
catch-all block within your synthetic Beam job, before actually
failing?
That
way we could see if the classloader is garbage-collected after a
restart.
Let me also investigate in the meantime. We are in the progress of
getting the
2.10.0 release ready with a few pending issues. So it would be a good
time to
fix this issue.
Thanks,
Max
On 17.01.19 09:50, Juan Carlos Garcia wrote:
Nice finding, we are also experiencing the same (Flink 1.5.4) where
few jobs
are dying of OOM for the metaspace as well after multiple restart, in
our case
we have
a HA flink cluster and not using YARN for orchestration.
Good job with the diagnosing .
JC
On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <daniel.har...@bbc.co.uk
<mailto:daniel.har...@bbc.co.uk>> wrote:
Environment:
BEAM 2.7.0
Flink 1.5.2
AWS EMR 5.17.0
Hadoop YARN for orchestration
We¹ve noticed the metaspace usage increasing when our Flink job
restarts,
which in turn sometimes causes YARN to kill the container for
going
beyond
its physical memory limits. After setting the MaxMetaspaceSize
setting and
making the JVM dump its heap on OOM, we noticed quite a few
instances of the
FlinkUserClassLoader class hanging around, which corresponded
with
the
number of restarts that happened.
Originally I posted this issue on the FLINK mailing list here
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use
r-
ClassLoader-leak-on-job-restart-td25547.html
After investigation I think this is related to something in the
BEAM code,
or the way BEAM interacts with the Flink class loading mechanism,
because I
can see the following when selecting one of the Œold¹
classloaders
-> Path
to GC Roots using Eclipse MAT in one of the heap dumps
This looks to me like this issue
https://github.com/FasterXML/jackson-databind/issues/1363
It sounds like to resolve it, user code should call
TypeFactory.defaultInstance().clearCache()when threads are
shutdown. I¹m not
sure where in the FlinkRunner codebase this should be though
To try and narrow it down as much as possible/reduce the number
of
dependencies I¹ve managed to reproduce this with a really really
simple job
that just reads from a synthetic unbounded source (back-ported
from
the
master branch) and does nothing
https://github.com/djhworld/streaming-job,
this will run on a Flink environment.
To reproduce the OOM I just ran the job with
MaxMetaspaceSize=125M,
and then
killed a random task manager every 60 seconds, which yielded the
following
As you can see the number of classes increases on each restart,
which causes
the metaspace to increase and eventually cause an OOM.
Is there anything we could do to fix this? I¹ve not tested this
on
2.7.0
because we are waiting for 2.10 to drop so we can run Flink
1.6/1.7
on EMR
With thanks,
Daniel
----------------------------
http://www.bbc.co.uk <http://www.bbc.co.uk>
This e-mail (and any attachments) is confidential and may contain
personal
views which are not the views of the BBC unless specifically
stated.
If you have received it in error, please delete it from your
system.
Do not use, copy or disclose the information in any way nor act
in
reliance
on it and notify the sender immediately.
Please note that the BBC monitors e-mails sent or received.
Further communication will signify your consent to this.
---------------------
--
JC
-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless
specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------
-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless
specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------