Improve teardown behavior in DoFnLifecycleManager Use Cache invalidation hooks to teardown DoFns that are no longer in the cache. Ensure that remove() and removeAll() report thrown exceptions even though the exceptions are not thrown by the LoadingCache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7239ebb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7239ebb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7239ebb0 Branch: refs/heads/gearpump-runner Commit: 7239ebb0c76f539f476cea0b44b1070e765cca41 Parents: 79bb2c2 Author: Thomas Groh <tg...@google.com> Authored: Mon Oct 24 13:43:43 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Oct 25 10:46:43 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/DoFnLifecycleManager.java | 56 +++++++++------ .../direct/DoFnLifecycleManagerTest.java | 74 ++++++++++++++++++-- 2 files changed, 104 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 23460b6..472b28b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -21,17 +21,17 @@ package org.apache.beam.runners.direct; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import java.util.ArrayList; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.Collection; -import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Setup; import org.apache.beam.sdk.transforms.DoFn.Teardown; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.SerializableUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Manages {@link DoFn} setup, teardown, and serialization. @@ -42,16 +42,18 @@ import org.slf4j.LoggerFactory; * clearing all cached {@link DoFn DoFns}. */ class DoFnLifecycleManager { - private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class); - public static DoFnLifecycleManager of(DoFn<?, ?> original) { return new DoFnLifecycleManager(original); } private final LoadingCache<Thread, DoFn<?, ?>> outstanding; + private final ConcurrentMap<Thread, Exception> thrownOnTeardown; private DoFnLifecycleManager(DoFn<?, ?> original) { - this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original)); + this.outstanding = CacheBuilder.newBuilder() + .removalListener(new TeardownRemovedFnListener()) + .build(new DeserializingCacheLoader(original)); + thrownOnTeardown = new ConcurrentHashMap<>(); } public DoFn<?, ?> get() throws Exception { @@ -61,8 +63,15 @@ class DoFnLifecycleManager { public void remove() throws Exception { Thread currentThread = Thread.currentThread(); - DoFn<?, ?> fn = outstanding.asMap().remove(currentThread); - DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); + outstanding.invalidate(currentThread); + // Block until the invalidate is fully completed + outstanding.cleanUp(); + // Remove to try too avoid reporting the same teardown exception twice. May still double-report, + // but the second will be suppressed. + Exception thrown = thrownOnTeardown.remove(currentThread); + if (thrown != null) { + throw thrown; + } } /** @@ -73,21 +82,13 @@ class DoFnLifecycleManager { * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception. */ public Collection<Exception> removeAll() throws Exception { - Iterator<DoFn<?, ?>> fns = outstanding.asMap().values().iterator(); - Collection<Exception> thrown = new ArrayList<>(); - while (fns.hasNext()) { - DoFn<?, ?> fn = fns.next(); - fns.remove(); - try { - DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); - } catch (Exception e) { - thrown.add(e); - } - } - return thrown; + outstanding.invalidateAll(); + // Make sure all of the teardowns are run + outstanding.cleanUp(); + return thrownOnTeardown.values(); } - private class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> { + private static class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> { private final byte[] original; public DeserializingCacheLoader(DoFn<?, ?> original) { @@ -102,4 +103,15 @@ class DoFnLifecycleManager { return fn; } } + + private class TeardownRemovedFnListener implements RemovalListener<Thread, DoFn<?, ?>> { + @Override + public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification) { + try { + DoFnInvokers.INSTANCE.newByteBuddyInvoker(notification.getValue()).invokeTeardown(); + } catch (Exception e) { + thrownOnTeardown.put(notification.getKey(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java index aef9d29..59e1e16 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.theInstance; import static org.junit.Assert.assertThat; @@ -34,8 +35,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.UserCodeException; import org.hamcrest.Matchers; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,6 +48,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DoFnLifecycleManagerTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private TestFn fn = new TestFn(); private DoFnLifecycleManager mgr = DoFnLifecycleManager.of(fn); @@ -105,6 +111,17 @@ public class DoFnLifecycleManagerTest { } @Test + public void teardownThrowsRemoveThrows() throws Exception { + TestFn obtained = (TestFn) mgr.get(); + obtained.teardown(); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("Cannot call teardown: already torn down"); + mgr.remove(); + } + + @Test public void teardownAllOnRemoveAll() throws Exception { CountDownLatch startSignal = new CountDownLatch(1); ExecutorService executor = Executors.newCachedThreadPool(); @@ -125,6 +142,38 @@ public class DoFnLifecycleManagerTest { } } + @Test + public void removeAndRemoveAllConcurrent() throws Exception { + CountDownLatch startSignal = new CountDownLatch(1); + ExecutorService executor = Executors.newCachedThreadPool(); + List<Future<TestFn>> futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + futures.add(executor.submit(new GetFnCallable(mgr, startSignal))); + } + startSignal.countDown(); + List<TestFn> fns = new ArrayList<>(); + for (Future<TestFn> future : futures) { + fns.add(future.get(1L, TimeUnit.SECONDS)); + } + CountDownLatch removeSignal = new CountDownLatch(1); + List<Future<Void>> removeFutures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + // These will reuse the threads used in the GetFns + removeFutures.add(executor.submit(new TeardownFnCallable(mgr, removeSignal))); + } + removeSignal.countDown(); + assertThat(mgr.removeAll(), Matchers.<Exception>emptyIterable()); + for (Future<Void> removed : removeFutures) { + // Should not have thrown an exception. + removed.get(); + } + + for (TestFn fn : fns) { + assertThat(fn.setupCalled, is(true)); + assertThat(fn.teardownCalled, is(true)); + } + } + private static class GetFnCallable implements Callable<TestFn> { private final DoFnLifecycleManager mgr; private final CountDownLatch startSignal; @@ -141,6 +190,23 @@ public class DoFnLifecycleManagerTest { } } + private static class TeardownFnCallable implements Callable<Void> { + private final DoFnLifecycleManager mgr; + private final CountDownLatch startSignal; + + private TeardownFnCallable(DoFnLifecycleManager mgr, CountDownLatch startSignal) { + this.mgr = mgr; + this.startSignal = startSignal; + } + + @Override + public Void call() throws Exception { + startSignal.await(); + // Will throw an exception if the TestFn has already been removed from this thread + mgr.remove(); + return null; + } + } private static class TestFn extends DoFn<Object, Object> { boolean setupCalled = false; @@ -148,8 +214,8 @@ public class DoFnLifecycleManagerTest { @Setup public void setup() { - checkState(!setupCalled); - checkState(!teardownCalled); + checkState(!setupCalled, "Cannot call setup: already set up"); + checkState(!teardownCalled, "Cannot call setup: already torn down"); setupCalled = true; } @@ -160,8 +226,8 @@ public class DoFnLifecycleManagerTest { @Teardown public void teardown() { - checkState(setupCalled); - checkState(!teardownCalled); + checkState(setupCalled, "Cannot call teardown: not set up"); + checkState(!teardownCalled, "Cannot call teardown: already torn down"); teardownCalled = true; }