[ 
https://issues.apache.org/jira/browse/BEAM-5110?focusedWorklogId=132765&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132765
 ]

ASF GitHub Bot logged work on BEAM-5110:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Aug/18 23:16
            Start Date: 08/Aug/18 23:16
    Worklog Time Spent: 10m 
      Work Description: bsidhom commented on a change in pull request #6189: 
[BEAM-5110] Explicitly count the references for 
BatchFlinkExecutableStageContext …
URL: https://github.com/apache/beam/pull/6189#discussion_r208764425
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
 ##########
 @@ -69,32 +75,132 @@ public StateRequestHandler getStateRequestHandler(
   }
 
   @Override
-  protected void finalize() throws Exception {
+  public void close() throws Exception {
     jobBundleFactory.close();
   }
 
   enum BatchFactory implements Factory {
-    INSTANCE;
-
-    @SuppressWarnings("Immutable") // observably immutable
-    private final LoadingCache<JobInfo, BatchFlinkExecutableStageContext> 
cachedContexts;
-
-    BatchFactory() {
-      cachedContexts =
-          CacheBuilder.newBuilder()
-              .weakValues()
-              .build(
-                  new CacheLoader<JobInfo, BatchFlinkExecutableStageContext>() 
{
-                    @Override
-                    public BatchFlinkExecutableStageContext load(JobInfo 
jobInfo) throws Exception {
-                      return create(jobInfo);
+    REFERENCE_COUNTING {
+      private static final int TTL_IN_SECONDS = 30;
+      int MAX_RETRY = 3;
+
+      @SuppressWarnings("Immutable") // observably immutable
 
 Review comment:
   Is this suppression still necessary with the new constant-specific 
implementation? Why use a constant-specific here? Do we intend to create 
multiple implementations? Note that the enum only exists in the first place to 
make the singleton serialization-safe.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 132765)
    Time Spent: 40m  (was: 0.5h)

> Reconile Flink JVM singleton management with deployment
> -------------------------------------------------------
>
>                 Key: BEAM-5110
>                 URL: https://issues.apache.org/jira/browse/BEAM-5110
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> [~angoenka] noticed through debugging that multiple instances of 
> BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when 
> executing in standalone cluster mode. This context factory is responsible for 
> maintaining singleton state across a TaskManager (JVM) in order to share SDK 
> Environments across workers in a given job. The multiple-loading breaks 
> singleton semantics and results in an indeterminate number of Environments 
> being created.
> It turns out that the [Flink classloading 
> mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html]
>  is determined by deployment mode. Note that "user code" as referenced by 
> this link is actually the Flink job server jar. Actual end-user code lives 
> inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using 
> file locks and/or additional gRPC servers), we need to force non-dynamic 
> classloading. For example, this happens when jobs are submitted to YARN for 
> one-off deployments via `flink run`. However, connecting to an existing 
> (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to 
> enforce) deployment modes that are consistent with our requirements, or (if 
> possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to