Repository: incubator-beam Updated Branches: refs/heads/master dbbd5e448 -> dc94dbdd7
Connect generated DoFnInvoker.invokerOnTimer to OnTimerInvoker Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a945a025 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a945a025 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a945a025 Branch: refs/heads/master Commit: a945a025301ca09a4cfc160302ef3914429dc15e Parents: dbbd5e4 Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 1 21:23:48 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Nov 15 20:08:41 2016 -0800 ---------------------------------------------------------------------- .../reflect/ByteBuddyDoFnInvokerFactory.java | 152 ++++++++++++++----- .../reflect/ByteBuddyOnTimerInvokerFactory.java | 24 ++- .../sdk/transforms/reflect/DoFnInvoker.java | 4 + .../sdk/transforms/reflect/DoFnInvokers.java | 6 + .../sdk/transforms/reflect/OnTimerInvoker.java | 2 +- .../transforms/reflect/DoFnInvokersTest.java | 137 ++++++++++++++++- .../testhelper/DoFnInvokersTestHelper.java | 137 +++++++++++++++++ 7 files changed, 415 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index c137255..bc6d8c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -23,6 +23,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -31,7 +32,6 @@ import net.bytebuddy.ByteBuddy; import net.bytebuddy.NamingStrategy; import net.bytebuddy.description.field.FieldDescription; import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.modifier.FieldManifestation; import net.bytebuddy.description.modifier.Visibility; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeList; @@ -65,6 +65,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; @@ -128,6 +129,54 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { return newByteBuddyInvoker(DoFnSignatures.getSignature((Class) fn.getClass()), fn); } + /** + * Internal base class for generated {@link DoFnInvoker} instances. + * + * <p>This class should <i>not</i> be extended directly, or by Beam users. It must be public for + * generated instances to have adequate access, as they are generated "inside" the invoked {@link + * DoFn} class. + */ + public abstract static class DoFnInvokerBase<InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>> + implements DoFnInvoker<InputT, OutputT> { + protected DoFnT delegate; + + private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>(); + + public DoFnInvokerBase(DoFnT delegate) { + this.delegate = delegate; + } + + /** + * Associates the given timer ID with the given {@link OnTimerInvoker}. + * + * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup + * of the timer ID rather than a generated conditional branch to choose which + * OnTimerInvoker to invoke. + * + * <p>This method has package level access as it is intended only for assembly of the + * {@link DoFnInvokerBase} not by any subclass. + */ + void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) { + this.onTimerInvokers.put(timerId, onTimerInvoker); + } + + @Override + public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) { + @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId); + + if (onTimerInvoker != null) { + onTimerInvoker.invokeOnTimer(arguments); + } else { + throw new IllegalArgumentException( + String.format( + "Attempted to invoke timer %s on %s, but that timer is not registered." + + " This is the responsibility of the runner, which must only deliver" + + " registered timers.", + timerId, delegate.getClass().getName())); + } + } + } + /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( DoFnSignature signature, DoFn<InputT, OutputT> fn) { @@ -136,10 +185,18 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { "Signature is for class %s, but fn is of class %s", signature.fnClass(), fn.getClass()); + try { @SuppressWarnings("unchecked") - DoFnInvoker<InputT, OutputT> invoker = - (DoFnInvoker<InputT, OutputT>) getByteBuddyInvokerConstructor(signature).newInstance(fn); + DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker = + (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>) + getByteBuddyInvokerConstructor(signature).newInstance(fn); + + for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { + invoker.addOnTimerInvoker(onTimerMethod.id(), + OnTimerInvokers.forTimer(fn, onTimerMethod.id())); + } + return invoker; } catch (InstantiationException | IllegalAccessException @@ -214,31 +271,39 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { return super.name(clazzDescription); } }) - // Create a subclass of DoFnInvoker - .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) - .defineField( - FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL) + // class <invoker class> extends DoFnInvokerBase { + .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) + + // public <invoker class>(<fn class> delegate) { this.delegate = delegate; } .defineConstructor(Visibility.PUBLIC) .withParameter(fnClass) .intercept(new InvokerConstructor()) + + // public invokeProcessElement(ProcessContext, ExtraContextFactory) { + // delegate.<@ProcessElement>(... pass just the right args ...); + // } .method(ElementMatchers.named("invokeProcessElement")) - .intercept(new ProcessElementDelegation(signature.processElement())) + .intercept(new ProcessElementDelegation(clazzDescription, signature.processElement())) + + // public invokeStartBundle(Context c) { delegate.<@StartBundle>(c); } + // ... etc ... .method(ElementMatchers.named("invokeStartBundle")) - .intercept(delegateOrNoop(signature.startBundle())) + .intercept(delegateOrNoop(clazzDescription, signature.startBundle())) .method(ElementMatchers.named("invokeFinishBundle")) - .intercept(delegateOrNoop(signature.finishBundle())) + .intercept(delegateOrNoop(clazzDescription, signature.finishBundle())) .method(ElementMatchers.named("invokeSetup")) - .intercept(delegateOrNoop(signature.setup())) + .intercept(delegateOrNoop(clazzDescription, signature.setup())) .method(ElementMatchers.named("invokeTeardown")) - .intercept(delegateOrNoop(signature.teardown())) + .intercept(delegateOrNoop(clazzDescription, signature.teardown())) .method(ElementMatchers.named("invokeGetInitialRestriction")) - .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction())) + .intercept( + delegateWithDowncastOrThrow(clazzDescription, signature.getInitialRestriction())) .method(ElementMatchers.named("invokeSplitRestriction")) - .intercept(splitRestrictionDelegation(signature)) + .intercept(splitRestrictionDelegation(clazzDescription, signature)) .method(ElementMatchers.named("invokeGetRestrictionCoder")) - .intercept(getRestrictionCoderDelegation(signature)) + .intercept(getRestrictionCoderDelegation(clazzDescription, signature)) .method(ElementMatchers.named("invokeNewTracker")) - .intercept(delegateWithDowncastOrThrow(signature.newTracker())); + .intercept(delegateWithDowncastOrThrow(clazzDescription, signature.newTracker())); DynamicType.Unloaded<?> unloaded = builder.make(); @@ -253,13 +318,15 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { return res; } - private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) { + private static Implementation getRestrictionCoderDelegation( + TypeDescription doFnType, DoFnSignature signature) { if (signature.processElement().isSplittable()) { if (signature.getRestrictionCoder() == null) { return MethodDelegation.to( new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT())); } else { return new DowncastingParametersMethodDelegation( + doFnType, signature.getRestrictionCoder().targetMethod()); } } else { @@ -267,26 +334,30 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } } - private static Implementation splitRestrictionDelegation(DoFnSignature signature) { + private static Implementation splitRestrictionDelegation( + TypeDescription doFnType, DoFnSignature signature) { if (signature.splitRestriction() == null) { return MethodDelegation.to(DefaultSplitRestriction.class); } else { - return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod()); + return new DowncastingParametersMethodDelegation( + doFnType, signature.splitRestriction().targetMethod()); } } /** Delegates to the given method if available, or does nothing. */ - private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) { + private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod + method) { return (method == null) ? FixedValue.originType() - : new DoFnMethodDelegation(method.targetMethod()); + : new DoFnMethodDelegation(doFnType, method.targetMethod()); } /** Delegates to the given method if available, or throws UnsupportedOperationException. */ - private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) { + private static Implementation delegateWithDowncastOrThrow( + TypeDescription doFnType, DoFnSignature.DoFnMethod method) { return (method == null) ? ExceptionMethod.throwing(UnsupportedOperationException.class) - : new DowncastingParametersMethodDelegation(method.targetMethod()); + : new DowncastingParametersMethodDelegation(doFnType, method.targetMethod()); } /** @@ -301,7 +372,10 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { protected FieldDescription delegateField; - public DoFnMethodDelegation(Method targetMethod) { + private final TypeDescription doFnType; + + public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) { + this.doFnType = doFnType; this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod); targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure()); } @@ -311,6 +385,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { // Remember the field description of the instrumented type. delegateField = instrumentedType + .getSuperClass() // always DoFnInvokerBase .getDeclaredFields() .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) .getOnly(); @@ -349,6 +424,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { MethodVariableAccess.REFERENCE.loadOffset(0), // Access this.delegate (DoFn on top of the stack) FieldAccess.forField(delegateField).getter(), + // Cast it to the more precise type + TypeCasting.to(doFnType), // Run the beforeDelegation manipulations. // The arguments necessary to invoke the target are on top of the stack. beforeDelegation(instrumentedMethod), @@ -400,8 +477,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { * to its expected type. */ private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation { - DowncastingParametersMethodDelegation(Method method) { - super(method); + DowncastingParametersMethodDelegation(TypeDescription doFnType, Method method) { + super(doFnType, method); } @Override @@ -536,8 +613,9 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ - private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) { - super(signature.targetMethod()); + private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod + signature) { + super(doFnType, signature.targetMethod()); this.signature = signature; } @@ -739,26 +817,16 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { new StackManipulation.Compound( // Load the this reference MethodVariableAccess.REFERENCE.loadOffset(0), + // Load the delegate argument + MethodVariableAccess.REFERENCE.loadOffset(1), // Invoke the super constructor (default constructor of Object) MethodInvocation.invoke( - new TypeDescription.ForLoadedType(Object.class) + new TypeDescription.ForLoadedType(DoFnInvokerBase.class) .getDeclaredMethods() .filter( ElementMatchers.isConstructor() - .and(ElementMatchers.takesArguments(0))) + .and(ElementMatchers.takesArguments(DoFn.class))) .getOnly()), - // Load the this reference - MethodVariableAccess.REFERENCE.loadOffset(0), - // Load the delegate argument - MethodVariableAccess.REFERENCE.loadOffset(1), - // Assign the delegate argument to the delegate field - FieldAccess.forField( - implementationTarget - .getInstrumentedType() - .getDeclaredFields() - .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) - .getOnly()) - .putter(), // Return void. MethodReturn.VOID) .apply(methodVisitor, implementationContext); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java index 4e53757..7a39ed1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java @@ -180,7 +180,9 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { // this.delegate.<@OnTimer method>(... pass the right args ...) // } .method(ElementMatchers.named("invokeOnTimer")) - .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId))); + .intercept( + new InvokeOnTimerDelegation( + clazzDescription, signature.onTimerMethods().get(timerId))); DynamicType.Unloaded<?> unloaded = builder.make(); @@ -203,12 +205,28 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { private final DoFnSignature.OnTimerMethod signature; - public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) { - super(signature.targetMethod()); + public InvokeOnTimerDelegation( + TypeDescription clazzDescription, DoFnSignature.OnTimerMethod signature) { + super(clazzDescription, signature.targetMethod()); this.signature = signature; } @Override + public InstrumentedType prepare(InstrumentedType instrumentedType) { + // Remember the field description of the instrumented type. + // Kind of a hack to set the protected value, because the instrumentedType + // is only available to prepare, while we need this information in + // beforeDelegation + delegateField = + instrumentedType + .getDeclaredFields() // the delegate is declared on the OnTimerInvoker + .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) + .getOnly(); + // Delegating the method call doesn't require any changes to the instrumented type. + return instrumentedType; + } + + @Override protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { // Parameters of the wrapper invoker method: // DoFn.ArgumentProvider http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index ce68d0b..2ae7920 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -50,6 +50,10 @@ public interface DoFnInvoker<InputT, OutputT> { */ DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT> extra); + /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */ + void invokeOnTimer( + String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments); + /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */ <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 9a96985..7eccaab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -115,6 +115,12 @@ public class DoFnInvokers { } @Override + public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) { + throw new UnsupportedOperationException( + String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); + } + + @Override public void invokeStartBundle(DoFn.Context c) { OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c); try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java index f87fa74..bfcafd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.transforms.DoFn; /** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */ -interface OnTimerInvoker<InputT, OutputT> { +public interface OnTimerInvoker<InputT, OutputT> { /** Invoke the {@link DoFn.OnTimer} method in the provided context. */ void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index c7b71ff..3d9e3ec 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms.reflect; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; @@ -45,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; @@ -55,6 +57,7 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -95,6 +98,10 @@ public class DoFnInvokersTest { return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); } + private void invokeOnTimer(String timerId, DoFn<String, String> fn) { + DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, mockArgumentProvider); + } + @Test public void testDoFnInvokersReused() throws Exception { // Ensures that we don't create a new Invoker class for every instance of the DoFn. @@ -460,7 +467,79 @@ public class DoFnInvokersTest { } // --------------------------------------------------------------------------------------- - // Tests for ability to invoke private, inner and anonymous classes. + // Tests for ability to invoke @OnTimer for private, inner and anonymous classes. + // --------------------------------------------------------------------------------------- + + private static final String TIMER_ID = "test-timer-id"; + + private static class PrivateDoFnWithTimers extends DoFn<String, String> { + @ProcessElement + public void processThis(ProcessContext c) {} + + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + } + + @Test + public void testLocalPrivateDoFnWithTimers() throws Exception { + PrivateDoFnWithTimers fn = mock(PrivateDoFnWithTimers.class); + invokeOnTimer(TIMER_ID, fn); + verify(fn).onTimer(mockWindow); + } + + @Test + public void testStaticPackagePrivateDoFnWithTimers() throws Exception { + DoFn<String, String> fn = + mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFnWithTimers().getClass()); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFnWithTimers(fn, mockWindow); + } + + @Test + public void testInnerPackagePrivateDoFnWithTimers() throws Exception { + DoFn<String, String> fn = + mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFnWithTimers().getClass()); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFnWithTimers(fn, mockWindow); + } + + @Test + public void testStaticPrivateDoFnWithTimers() throws Exception { + DoFn<String, String> fn = + mock(DoFnInvokersTestHelper.newStaticPrivateDoFnWithTimers().getClass()); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyStaticPrivateDoFnWithTimers(fn, mockWindow); + } + + @Test + public void testInnerPrivateDoFnWithTimers() throws Exception { + DoFn<String, String> fn = + mock(new DoFnInvokersTestHelper().newInnerPrivateDoFnWithTimers().getClass()); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyInnerPrivateDoFnWithTimers(fn, mockWindow); + } + + @Test + public void testAnonymousInnerDoFnWithTimers() throws Exception { + DoFn<String, String> fn = + mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFnWithTimers().getClass()); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyInnerAnonymousDoFnWithTimers(fn, mockWindow); + } + + @Test + public void testStaticAnonymousDoFnWithTimersInOtherPackage() throws Exception { + // Can't use mockito for this one - the anonymous class is final and can't be mocked. + DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFnWithTimers(); + invokeOnTimer(TIMER_ID, fn); + DoFnInvokersTestHelper.verifyStaticAnonymousDoFnWithTimersInvoked(fn, mockWindow); + } + + // --------------------------------------------------------------------------------------- + // Tests for ability to invoke @ProcessElement for private, inner and anonymous classes. // --------------------------------------------------------------------------------------- private static class PrivateDoFnClass extends DoFn<String, String> { @@ -605,6 +684,62 @@ public class DoFnInvokersTest { invoker.invokeFinishBundle(null); } + @Test + public void testOnTimerHelloWord() throws Exception { + final String timerId = "my-timer-id"; + + class SimpleTimerDoFn extends DoFn<String, String> { + + public String status = "not yet"; + + @TimerId(timerId) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(timerId) + public void onMyTimer() { + status = "OK now"; + } + } + + SimpleTimerDoFn fn = new SimpleTimerDoFn(); + + DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); + invoker.invokeOnTimer(timerId, mockArgumentProvider); + assertThat(fn.status, equalTo("OK now")); + } + + @Test + public void testOnTimerWithWindow() throws Exception { + final String timerId = "my-timer-id"; + final IntervalWindow testWindow = new IntervalWindow(new Instant(0), new Instant(15)); + when(mockArgumentProvider.window()).thenReturn(testWindow); + + class SimpleTimerDoFn extends DoFn<String, String> { + + public IntervalWindow window = null; + + @TimerId(timerId) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(timerId) + public void onMyTimer(IntervalWindow w) { + window = w; + } + } + + SimpleTimerDoFn fn = new SimpleTimerDoFn(); + + DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); + invoker.invokeOnTimer(timerId, mockArgumentProvider); + assertThat(fn.window, equalTo(testWindow)); + } + private class OldDoFnIdentity extends OldDoFn<String, String> { public void processElement(ProcessContext c) {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java index c20a788..95e7c49 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java @@ -23,6 +23,10 @@ import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; /** * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access to DoFns in @@ -121,4 +125,137 @@ public class DoFnInvokersTestHelper { fn.getClass().getMethod("verify", DoFn.ProcessContext.class).invoke(fn, context); } + + // + // Classes for testing OnTimer methods when the DoFn does not live in the same package + // + + private static final String TIMER_ID = "test-timer-id"; + + private static class StaticPrivateDoFnWithTimers extends DoFn<String, String> { + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + + @ProcessElement + public void process(ProcessContext c) {} + } + + private class InnerPrivateDoFnWithTimers extends DoFn<String, String> { + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + + @ProcessElement + public void process(ProcessContext c) {} + } + + static class StaticPackagePrivateDoFnWithTimers extends DoFn<String, String> { + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + + @ProcessElement + public void process(ProcessContext c) {} + } + + class InnerPackagePrivateDoFnWithTimers extends DoFn<String, String> { + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + + @ProcessElement + public void process(ProcessContext c) {} + } + + public static DoFn<String, String> newStaticPackagePrivateDoFnWithTimers() { + return new StaticPackagePrivateDoFnWithTimers(); + } + + public static void verifyStaticPackagePrivateDoFnWithTimers( + DoFn<String, String> fn, BoundedWindow window) { + verify((StaticPackagePrivateDoFnWithTimers) fn).onTimer(window); + } + + public DoFn<String, String> newInnerPackagePrivateDoFnWithTimers() { + return new InnerPackagePrivateDoFnWithTimers(); + } + + public static void verifyInnerPackagePrivateDoFnWithTimers( + DoFn<String, String> fn, BoundedWindow window) { + verify((InnerPackagePrivateDoFnWithTimers) fn).onTimer(window); + } + + public static DoFn<String, String> newStaticPrivateDoFnWithTimers() { + return new StaticPrivateDoFnWithTimers(); + } + + public static void verifyStaticPrivateDoFnWithTimers( + DoFn<String, String> fn, BoundedWindow window) { + verify((StaticPrivateDoFnWithTimers) fn).onTimer(window); + } + + public DoFn<String, String> newInnerPrivateDoFnWithTimers() { + return new InnerPrivateDoFnWithTimers(); + } + + public static void verifyInnerPrivateDoFnWithTimers( + DoFn<String, String> fn, BoundedWindow window) { + verify((InnerPrivateDoFnWithTimers) fn).onTimer(window); + } + + public DoFn<String, String> newInnerAnonymousDoFnWithTimers() { + return new DoFn<String, String>() { + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow w) {} + + @ProcessElement + public void process(ProcessContext c) {} + }; + } + + public static void verifyInnerAnonymousDoFnWithTimers( + DoFn<String, String> fn, BoundedWindow window) throws Exception { + DoFn<String, String> verifier = verify(fn); + verifier.getClass().getMethod("onTimer", BoundedWindow.class).invoke(verifier, window); + } + + public static DoFn<String, String> newStaticAnonymousDoFnWithTimers() { + return new DoFn<String, String>() { + private BoundedWindow invokedWindow; + + @ProcessElement + public void process(ProcessContext c) {} + + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @OnTimer(TIMER_ID) + public void onTimer(BoundedWindow window) { + assertNull("Should have been invoked just once", invokedWindow); + invokedWindow = window; + } + + @SuppressWarnings("unused") + public void verify(BoundedWindow window) { + assertEquals(window, invokedWindow); + } + }; + } + + public static void verifyStaticAnonymousDoFnWithTimersInvoked( + DoFn<String, String> fn, BoundedWindow window) throws Exception { + fn.getClass().getMethod("verify", BoundedWindow.class).invoke(fn, window); + } }