[jira] [Work logged] (BEAM-5332) SDK harness containers are not eventually shut down after job ends
[ https://issues.apache.org/jira/browse/BEAM-5332?focusedWorklogId=142159&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142159 ] ASF GitHub Bot logged work on BEAM-5332: Author: ASF GitHub Bot Created on: 07/Sep/18 12:18 Start Date: 07/Sep/18 12:18 Worklog Time Spent: 10m Work Description: tweise closed pull request #6342: [BEAM-5332] Do not attempt to evict cache after shutdown URL: https://github.com/apache/beam/pull/6342 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java index 565e0b9771b..988a94826fb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; @@ -42,7 +41,6 @@ implements FlinkExecutableStageContext.Factory { private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class); - private static final int TTL_IN_SECONDS = 30; private static final int MAX_RETRY = 3; private final Creator creator; @@ -106,8 +104,9 @@ private void scheduleRelease(JobInfo jobInfo) { WrappedContext wrapper = getCache().get(jobInfo.jobId()); Preconditions.checkState( wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId()); -// Schedule task to clean the container later. -getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, TimeUnit.SECONDS); +// Do not release this asynchronously, as the releasing could fail due to the classloader not being +// available anymore after the tasks have been removed from the execution engine. +release(wrapper); } private ConcurrentHashMap getCache() { @@ -148,8 +147,8 @@ void release(FlinkExecutableStageContext context) { if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) { try { wrapper.closeActual(); - } catch (Exception e) { -LOG.error("Unable to close.", e); + } catch (Throwable t) { +LOG.error("Unable to close FlinkExecutableStageContext.", t); } } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java index cf758647307..06a43c81b0c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java @@ -17,10 +17,16 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.base.Charsets; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.junit.Assert; @@ -77,4 +83,33 @@ public void testCreateReuseReleaseCreate() throws Exception { Assert.assertNotSame("We should get a new instance.", ac2B, ac4B); factory.release(ac4B); // 0 open jobB } + + @Test + public void testCatchThrowablesAndLogThem() throws Exception { +PrintStream oldErr = System.err; +oldErr.flush(); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +PrintStream newErr = new PrintStream(bao
[jira] [Work logged] (BEAM-5332) SDK harness containers are not eventually shut down after job ends
[ https://issues.apache.org/jira/browse/BEAM-5332?focusedWorklogId=141776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141776 ] ASF GitHub Bot logged work on BEAM-5332: Author: ASF GitHub Bot Created on: 06/Sep/18 14:23 Start Date: 06/Sep/18 14:23 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #6342: [BEAM-5332] Do not attempt to evict cache after shutdown URL: https://github.com/apache/beam/pull/6342 When the job shuts down, the user code classloader is cleared which removes the possibility to load new classes. The LoadingCache in JobBundleFactory attempts to load the RemovalCause class after job shutdown to evict the cache. This results in the following exception which prevents cleanup of Docker containers: ``` 2018-09-06 15:37:07,996 ERROR org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory - Unable to close. java.lang.NoClassDefFoundError: org/apache/beam/repackaged/beam_runners_java_fn_execution/com/google/common/cache/RemovalCause at org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3290) at org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache.clear(LocalCache.java:4322) at org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4937) at org.apache.beam.runners.fnexecution.control.JobBundleFactoryBase.close(JobBundleFactoryBase.java:186) at org.apache.beam.runners.flink.translation.functions.FlinkBatchExecutableStageContext.close(FlinkBatchExecutableStageContext.java:68) at org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingFlinkExecutableStageContextFactory.java:186) at org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingFlinkExecutableStageContextFactory.java:162) at org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.release(ReferenceCountingFlinkExecutableStageContextFactory.java:150) at org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.lambda$scheduleRelease$1(ReferenceCountingFlinkExecutableStageContextFactory.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.RemovalCause at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 16 more ``` The solution for now is to attempt to cleanup the cache synchronously when closing the JobBundleFactory. CC @angoenka @tweise Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastComplet