[jira] [Work logged] (BEAM-5332) SDK harness containers are not eventually shut down after job ends

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5332?focusedWorklogId=142159&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142159
 ]

ASF GitHub Bot logged work on BEAM-5332:


Author: ASF GitHub Bot
Created on: 07/Sep/18 12:18
Start Date: 07/Sep/18 12:18
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6342: [BEAM-5332] Do not 
attempt to evict cache after shutdown
URL: https://github.com/apache/beam/pull/6342
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 565e0b9771b..988a94826fb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -24,7 +24,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -42,7 +41,6 @@
 implements FlinkExecutableStageContext.Factory {
   private static final Logger LOG =
   
LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
-  private static final int TTL_IN_SECONDS = 30;
   private static final int MAX_RETRY = 3;
 
   private final Creator creator;
@@ -106,8 +104,9 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Schedule task to clean the container later.
-getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, 
TimeUnit.SECONDS);
+// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
+// available anymore after the tasks have been removed from the execution 
engine.
+release(wrapper);
   }
 
   private ConcurrentHashMap getCache() {
@@ -148,8 +147,8 @@ void release(FlinkExecutableStageContext context) {
 if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
   try {
 wrapper.closeActual();
-  } catch (Exception e) {
-LOG.error("Unable to close.", e);
+  } catch (Throwable t) {
+LOG.error("Unable to close FlinkExecutableStageContext.", t);
   }
 }
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
index cf758647307..06a43c81b0c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -17,10 +17,16 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Charsets;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.junit.Assert;
@@ -77,4 +83,33 @@ public void testCreateReuseReleaseCreate() throws Exception {
 Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
 factory.release(ac4B); // 0 open jobB
   }
+
+  @Test
+  public void testCatchThrowablesAndLogThem() throws Exception {
+PrintStream oldErr = System.err;
+oldErr.flush();
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+PrintStream newErr = new PrintStream(bao

[jira] [Work logged] (BEAM-5332) SDK harness containers are not eventually shut down after job ends

2018-09-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5332?focusedWorklogId=141776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141776
 ]

ASF GitHub Bot logged work on BEAM-5332:


Author: ASF GitHub Bot
Created on: 06/Sep/18 14:23
Start Date: 06/Sep/18 14:23
Worklog Time Spent: 10m 
  Work Description: mxm opened a new pull request #6342: [BEAM-5332] Do not 
attempt to evict cache after shutdown
URL: https://github.com/apache/beam/pull/6342
 
 
   When the job shuts down, the user code classloader is cleared which removes 
the
   possibility to load new classes. The LoadingCache in JobBundleFactory 
attempts
   to load the RemovalCause class after job shutdown to evict the cache. This
   results in the following exception which prevents cleanup of Docker 
containers:
   
   ```
   2018-09-06 15:37:07,996 ERROR 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory
  - Unable to close.
   java.lang.NoClassDefFoundError: 
org/apache/beam/repackaged/beam_runners_java_fn_execution/com/google/common/cache/RemovalCause
   at 
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3290)
   at 
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache.clear(LocalCache.java:4322)
   at 
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4937)
   at 
org.apache.beam.runners.fnexecution.control.JobBundleFactoryBase.close(JobBundleFactoryBase.java:186)
   at 
org.apache.beam.runners.flink.translation.functions.FlinkBatchExecutableStageContext.close(FlinkBatchExecutableStageContext.java:68)
   at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingFlinkExecutableStageContextFactory.java:186)
   at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingFlinkExecutableStageContextFactory.java:162)
   at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.release(ReferenceCountingFlinkExecutableStageContextFactory.java:150)
   at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.lambda$scheduleRelease$1(ReferenceCountingFlinkExecutableStageContextFactory.java:110)
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.RemovalCause
   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
   at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
   ... 16 more
   ```
   
   The solution for now is to attempt to cleanup the cache synchronously when
   closing the JobBundleFactory.
   
   CC @angoenka @tweise 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastComplet