[ 
https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580536#comment-16580536
 ] 

Ben Sidhom commented on BEAM-5110:
----------------------------------

Sadly, after investigating this for a while, I think that the best option (for 
now) is to document (and possibly enforce in code where possible) the 
situations where dynamic code loading will not happen in the workers. Flink 
attempts to isolate user code with separate classloaders and Java does not 
expose a way to force shared classloaders (or even list unconnected 
classloaders) without special instrumentation.

The following deployment modes use regular system class loading for the Beam 
Job server and workers:
 * Direct YARN deployments (using bin/flink run -m yarn-cluster)
 * Standalone/Docker/Kubernetes/Mesos sessions where the job server jar has 
been dropped into the lib/ directory of distributions. In this case, Flink's 
default classloading strategy will not work because child (Operator-scoped) 
classloaders override system classloaders; any user code (e.g., Beam 
environment manager code where the job server is submitted to a 
RemoteEnvironment) will override Flink/system classes. In order to get around 
this, Flink's 
[classloader.resolve-order|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#classloader-resolve-order]
 must be set to parent-first _or_ classloader.parent-first-patterns-additional 
must be populated with the set of packages that require JVM singletons.

In general, modes where the job server jar is sent to a new RemoteEnvironment 
will break singleton semantics. We should at the very least log a warning when 
this is detected in the job server, if disable this altogether.

> Reconile Flink JVM singleton management with deployment
> -------------------------------------------------------
>
>                 Key: BEAM-5110
>                 URL: https://issues.apache.org/jira/browse/BEAM-5110
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to