This is an automated email from the ASF dual-hosted git repository. michel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 38bef4a [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563) 38bef4a is described below commit 38bef4ae946d584421d401ce6d2632f1ead82ebf Author: Michael Luckey <25622840+adude3...@users.noreply.github.com> AuthorDate: Sun May 19 11:00:24 2019 +0200 [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563) --- .../beam/sdk/transforms/ParDoLifecycleTest.java | 170 ++++++++++++++++----- 1 file changed, 130 insertions(+), 40 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java index d1cc8f7..f6815db 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -18,14 +18,23 @@ package org.apache.beam.sdk.transforms; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.fail; import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; @@ -162,10 +171,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -178,10 +184,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -194,10 +197,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -210,10 +210,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -226,10 +223,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -242,10 +236,7 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @@ -258,13 +249,25 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } + private void validate() { + assertThat(ExceptionThrowingFn.callStateMap, is(not(anEmptyMap()))); + // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect + // teardown to be called on fn itself, but on any deserialized instance on which any other + // lifecycle method was called + ExceptionThrowingFn.callStateMap + .values() + .forEach( + value -> + assertThat( + "Function should have been torn down after exception", + value.finalState(), + is(CallState.TEARDOWN))); + } + @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class}) public void testTeardownCalledAfterExceptionInFinishBundleStateful() { @@ -274,20 +277,64 @@ public class ParDoLifecycleTest implements Serializable { p.run(); fail("Pipeline should have failed with an exception"); } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingFn.teardownCalled.get(), - is(true)); + validate(); } } @Before public void setup() { - ExceptionThrowingFn.teardownCalled.set(false); + ExceptionThrowingFn.callStateMap = new ConcurrentHashMap<>(); + ExceptionThrowingFn.exceptionWasThrown.set(false); + } + + private static class DelayedCallStateTracker { + private CountDownLatch latch; + private AtomicReference<CallState> callState; + + private DelayedCallStateTracker(CallState setup) { + latch = new CountDownLatch(1); + callState = new AtomicReference<>(setup); + } + + DelayedCallStateTracker update(CallState val) { + CallState previous = callState.getAndSet(val); + if (previous == CallState.TEARDOWN && val != CallState.TEARDOWN) { + fail("illegal state change from " + callState + " to " + val); + } + + if (CallState.TEARDOWN == val) { + latch.countDown(); + } + + return this; + } + + @Override + public String toString() { + return "DelayedCallStateTracker{" + "latch=" + latch + ", callState=" + callState + '}'; + } + + CallState callState() { + return callState.get(); + } + + CallState finalState() { + try { + // call to tearDown might be delayed on other thread (happens on direct runner) + // so lets wait a while if not yet called to give a chance to catch up + latch.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return callState(); + } } private static class ExceptionThrowingFn<T> extends DoFn<T, T> { - static AtomicBoolean teardownCalled = new AtomicBoolean(false); + static Map<Integer, DelayedCallStateTracker> callStateMap = new ConcurrentHashMap<>(); + // exception is not necessarily thrown on every instance. But we expect at least + // one during tests + static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); private final MethodForException toThrow; private boolean thrown; @@ -298,41 +345,76 @@ public class ParDoLifecycleTest implements Serializable { @Setup public void before() throws Exception { - assertThat("teardown should not have been called", teardownCalled.get(), is(false)); + assertThat( + "lifecycle methods should not have been called", callStateMap.get(id()), is(nullValue())); + initCallState(); throwIfNecessary(MethodForException.SETUP); } @StartBundle public void preBundle() throws Exception { - assertThat("teardown should not have been called", teardownCalled.get(), is(false)); + assertThat( + "lifecycle method should have been called before start bundle", + getCallState(), + anyOf(equalTo(CallState.SETUP), equalTo(CallState.FINISH_BUNDLE))); + updateCallState(CallState.START_BUNDLE); throwIfNecessary(MethodForException.START_BUNDLE); } @ProcessElement public void perElement(ProcessContext c) throws Exception { - assertThat("teardown should not have been called", teardownCalled.get(), is(false)); + assertThat( + "lifecycle method should have been called before processing bundle", + getCallState(), + anyOf(equalTo(CallState.START_BUNDLE), equalTo(CallState.PROCESS_ELEMENT))); + updateCallState(CallState.PROCESS_ELEMENT); throwIfNecessary(MethodForException.PROCESS_ELEMENT); } @FinishBundle public void postBundle() throws Exception { - assertThat("teardown should not have been called", teardownCalled.get(), is(false)); + assertThat( + "processing bundle should have been called before finish bundle", + getCallState(), + is(CallState.PROCESS_ELEMENT)); + updateCallState(CallState.FINISH_BUNDLE); throwIfNecessary(MethodForException.FINISH_BUNDLE); } private void throwIfNecessary(MethodForException method) throws Exception { if (toThrow == method && !thrown) { thrown = true; + exceptionWasThrown.set(true); throw new Exception("Hasn't yet thrown"); } } @Teardown public void after() { - if (!thrown) { + if (!exceptionWasThrown.get()) { fail("Excepted to have a processing method throw an exception"); } - teardownCalled.set(true); + assertThat( + "some lifecycle method should have been called", + callStateMap.get(id()), + is(notNullValue())); + updateCallState(CallState.TEARDOWN); + } + + private void initCallState() { + callStateMap.put(id(), new DelayedCallStateTracker(CallState.SETUP)); + } + + private int id() { + return System.identityHashCode(this); + } + + private void updateCallState(CallState processElement) { + callStateMap.get(id()).update(processElement); + } + + private CallState getCallState() { + return callStateMap.get(id()).callState(); } } @@ -347,6 +429,14 @@ public class ParDoLifecycleTest implements Serializable { } } + private enum CallState { + SETUP, + START_BUNDLE, + PROCESS_ELEMENT, + FINISH_BUNDLE, + TEARDOWN + } + private enum MethodForException { SETUP, START_BUNDLE,