This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7719708 Merge pull request #10804: [BEAM-2535] Fix timer map 7719708 is described below commit 7719708a04d5d0eff3048dbd58ac1337889f8ba5 Author: reuvenlax <re...@google.com> AuthorDate: Sat Feb 8 22:57:17 2020 -0800 Merge pull request #10804: [BEAM-2535] Fix timer map --- .../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +- runners/google-cloud-dataflow-java/build.gradle | 1 - .../runners/dataflow/worker/SimpleParDoFn.java | 3 +- .../translation/batch/ParDoTranslatorBatch.java | 4 +- .../spark/translation/TranslationUtils.java | 3 +- .../java/org/apache/beam/sdk/transforms/ParDo.java | 3 +- .../reflect/ByteBuddyDoFnInvokerFactory.java | 114 ++++++--------------- .../sdk/transforms/reflect/DoFnSignatures.java | 3 + .../org/apache/beam/sdk/transforms/ParDoTest.java | 91 +++++++++++++++- .../sdk/transforms/reflect/DoFnInvokersTest.java | 6 +- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 2 +- 11 files changed, 133 insertions(+), 99 deletions(-) diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index f3f1628..64a2d4d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -120,7 +120,7 @@ public class SimpleDoFnRunnerTest { runner.onTimer( ThrowingDoFn.TIMER_ID, - ThrowingDoFn.TIMER_ID, + "", GlobalWindow.INSTANCE, new Instant(0), new Instant(0), diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 8eb9808..3a16f7a 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -137,7 +137,6 @@ def commonExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesSetState', 'org.apache.beam.sdk.testing.UsesMapState', - 'org.apache.beam.sdk.testing.UsesTimerMap', 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs', 'org.apache.beam.sdk.testing.UsesUnboundedPCollections', 'org.apache.beam.sdk.testing.UsesTestStream', diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 4c011ed..76ea9d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -355,7 +355,8 @@ public class SimpleParDoFn<InputT, OutputT> implements ParDoFn { } private void processUserTimer(TimerData timer) throws Exception { - if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) { + if (fnSignature.timerDeclarations().containsKey(timer.getTimerId()) + || fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) { BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow(); fnRunner.onTimer( timer.getTimerId(), diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index b403248..3509126 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -79,7 +79,9 @@ class ParDoTranslatorBatch<InputT, OutputT> // TODO: add support of states and timers DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); boolean stateful = - signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0; + signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0 + || signature.timerFamilyDeclarations().size() > 0; checkState(!stateful, "States and timers are not supported for the moment."); DoFnSchemaInformation doFnSchemaInformation = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 16a4ca9..2b15639 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -273,7 +273,8 @@ public final class TranslationUtils { SparkRunner.class.getSimpleName())); } - if (signature.timerDeclarations().size() > 0) { + if (signature.timerDeclarations().size() > 0 + || signature.timerFamilyDeclarations().size() > 0) { throw new UnsupportedOperationException( String.format( "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index bca2eb4..29f4daf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -583,7 +583,8 @@ public class ParDo { } // Timers are semantically incompatible with splitting - if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable()) { + if ((!signature.timerDeclarations().isEmpty() || !signature.timerFamilyDeclarations().isEmpty()) + && signature.processElement().isSplittable()) { throw new UnsupportedOperationException( String.format( "%s is splittable and uses timers, but these are not compatible", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 5fd2a9a..1e0916b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -24,7 +24,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -96,6 +95,7 @@ import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.MethodVisit import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Opcodes; import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Type; import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives; /** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */ @@ -172,7 +172,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { implements DoFnInvoker<InputT, OutputT> { protected DoFnT delegate; - private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>(); + private Map<String, OnTimerInvoker> onTimerInvokers = Maps.newHashMap(); + private Map<String, OnTimerInvoker> onTimerFamilyInvokers = Maps.newHashMap(); public DoFnInvokerBase(DoFnT delegate) { this.delegate = delegate; @@ -191,49 +192,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { this.onTimerInvokers.put(timerId, onTimerInvoker); } - @Override - public void invokeOnTimer( - String timerId, - String timerFamilyId, - DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) { - @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId); - - if (onTimerInvoker != null) { - onTimerInvoker.invokeOnTimer(arguments); - } else { - throw new IllegalArgumentException( - String.format( - "Attempted to invoke timer %s on %s, but that timer is not registered." - + " This is the responsibility of the runner, which must only deliver" - + " registered timers.", - timerId, delegate.getClass().getName())); - } - } - - @Override - public DoFn<InputT, OutputT> getFn() { - return delegate; - } - } - - /** - * Internal base class for generated {@link DoFnInvoker} instances. - * - * <p>This class should <i>not</i> be extended directly, or by Beam users. It must be public for - * generated instances to have adequate access, as they are generated "inside" the invoked {@link - * DoFn} class. - */ - public abstract static class DoFnInvokerTimerFamily< - InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>> - implements DoFnInvoker<InputT, OutputT> { - protected DoFnT delegate; - - private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>(); - - public DoFnInvokerTimerFamily(DoFnT delegate) { - this.delegate = delegate; - } - /** * Associates the given timerFamily ID with the given {@link OnTimerInvoker}. * @@ -241,7 +199,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { * timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke. */ void addOnTimerFamilyInvoker(String timerFamilyId, OnTimerInvoker onTimerInvoker) { - this.onTimerInvokers.put(timerFamilyId, onTimerInvoker); + this.onTimerFamilyInvokers.put(timerFamilyId, onTimerInvoker); } @Override @@ -249,17 +207,21 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { String timerId, String timerFamilyId, DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) { - @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerFamilyId); + @Nullable + OnTimerInvoker onTimerInvoker = + (timerFamilyId.isEmpty()) + ? onTimerInvokers.get(timerId) + : onTimerFamilyInvokers.get(timerFamilyId); if (onTimerInvoker != null) { onTimerInvoker.invokeOnTimer(arguments); } else { throw new IllegalArgumentException( String.format( - "Attempted to invoke timerFamily %s on %s, but that timerFamily is not registered." + "Attempted to invoke timer %s on %s, but that timer is not registered." + " This is the responsibility of the runner, which must only deliver" + " registered timers.", - timerFamilyId, delegate.getClass().getName())); + timerId, delegate.getClass().getName())); } } @@ -279,33 +241,21 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { fn.getClass()); try { - if (signature.timerFamilyDeclarations().size() > 0) { - @SuppressWarnings("unchecked") - DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>> invoker = - (DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>>) - getByteBuddyInvokerConstructor(signature).newInstance(fn); - - for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod : - signature.onTimerFamilyMethods().values()) { - invoker.addOnTimerFamilyInvoker( - onTimerFamilyMethod.id(), - OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id())); - } - return invoker; - } else { - - @SuppressWarnings("unchecked") - DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker = - (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>) - getByteBuddyInvokerConstructor(signature).newInstance(fn); - - for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { - invoker.addOnTimerInvoker( - onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id())); - } - return invoker; + @SuppressWarnings("unchecked") + DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker = + (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>) + getByteBuddyInvokerConstructor(signature).newInstance(fn); + + for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { + invoker.addOnTimerInvoker( + onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id())); } - + for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod : + signature.onTimerFamilyMethods().values()) { + invoker.addOnTimerFamilyInvoker( + onTimerFamilyMethod.id(), OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id())); + } + return invoker; } catch (InstantiationException | IllegalAccessException | IllegalArgumentException @@ -325,12 +275,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass); if (constructor == null) { - Class<? extends DoFnInvoker<?, ?>> invokerClass = - generateInvokerClass( - signature, - signature.timerFamilyDeclarations().size() > 0 - ? DoFnInvokerTimerFamily.class - : DoFnInvokerBase.class); + Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature); try { constructor = invokerClass.getConstructor(fnClass); } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { @@ -391,8 +336,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ - private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass( - DoFnSignature signature, Class<? extends DoFnInvoker> clazz) { + private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) { Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); @@ -406,12 +350,12 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { .withSuffix(DoFnInvoker.class.getSimpleName())) // class <invoker class> extends DoFnInvokerBase { - .subclass(clazz, ConstructorStrategy.Default.NO_CONSTRUCTORS) + .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) // public <invoker class>(<fn class> delegate) { this.delegate = delegate; } .defineConstructor(Visibility.PUBLIC) .withParameter(fnClass) - .intercept(new InvokerConstructor(clazz)) + .intercept(new InvokerConstructor(DoFnInvokerBase.class)) // public invokeProcessElement(ProcessContext, ExtraContextFactory) { // delegate.<@ProcessElement>(... pass just the right args ...); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 23c7209..48f5adf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -1559,6 +1559,9 @@ public class DoFnSignatures { Map<String, TimerFamilyDeclaration> declarations, String id, Field field) { + if (id.isEmpty()) { + errors.throwIllegalArgument("TimerFamily id must not be empty"); + } if (declarations.containsKey(id)) { errors.throwIllegalArgument( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index deca9b7..65f7003 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -118,6 +118,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; @@ -4482,7 +4483,17 @@ public class ParDoTest implements Serializable { @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) - public void testTimerFamilyEventTime() throws Exception { + public void testTimerFamilyEventTimeBounded() throws Exception { + runTestTimerFamilyEventTime(false); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) + public void testTimerFamilyEventTimeUnbounded() throws Exception { + runTestTimerFamilyEventTime(true); + } + + public void runTestTimerFamilyEventTime(boolean useStreaming) { final String timerFamilyId = "foo"; DoFn<KV<String, Integer>, String> fn = @@ -4512,14 +4523,27 @@ public class ParDoTest implements Serializable { }; PCollection<String> output = - pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + pipeline + .apply(Create.of(KV.of("hello", 37))) + .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) + .apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2"); pipeline.run(); } @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) - public void testTimerWithMultipleTimerFamily() throws Exception { + public void testTimerWithMultipleTimerFamilyBounded() throws Exception { + runTestTimerWithMultipleTimerFamily(false); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) + public void testTimerWithMultipleTimerFamilyUnbounded() throws Exception { + runTestTimerWithMultipleTimerFamily(true); + } + + public void runTestTimerWithMultipleTimerFamily(boolean useStreaming) throws Exception { final String timerFamilyId1 = "foo"; final String timerFamilyId2 = "bar"; @@ -4556,12 +4580,71 @@ public class ParDoTest implements Serializable { }; PCollection<String> output = - pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + pipeline + .apply(Create.of(KV.of("hello", 37))) + .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) + .apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder("process", "timer", "timer"); pipeline.run(); } @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) + public void testTimerFamilyAndTimerBounded() throws Exception { + runTestTimerFamilyAndTimer(false); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) + public void testTimerFamilyAndTimerUnbounded() throws Exception { + runTestTimerFamilyAndTimer(true); + } + + public void runTestTimerFamilyAndTimer(boolean useStreaming) throws Exception { + final String timerFamilyId = "foo"; + final String timerId = "timer"; + + DoFn<KV<String, Integer>, String> fn = + new DoFn<KV<String, Integer>, String>() { + + @TimerFamily(timerFamilyId) + private final TimerSpec spec1 = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); + + @TimerId(timerId) + private final TimerSpec spec2 = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @TimerFamily(timerFamilyId) TimerMap timerMap, + @TimerId(timerId) Timer timer, + OutputReceiver<String> r) { + timerMap.set("timer", new Instant(1)); + timer.set(new Instant(2)); + r.output("process"); + } + + @OnTimerFamily(timerFamilyId) + public void onTimer1( + @TimerId String timerId, @Timestamp Instant ts, OutputReceiver<String> r) { + r.output("family:" + timerFamilyId + ":" + timerId); + } + + @OnTimer(timerId) + public void onTimer2(@Timestamp Instant ts, OutputReceiver<String> r) { + r.output(timerId); + } + }; + + PCollection<String> output = + pipeline + .apply(Create.of(KV.of("hello", 37))) + .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) + .apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder("process", "family:foo:timer", "timer"); + pipeline.run(); + } + + @Test @Category({ ValidatesRunner.class, UsesTimersInParDo.class, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index fa45c20..51d0d173 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -114,7 +114,7 @@ public class DoFnInvokersTest { } private void invokeOnTimer(String timerId, DoFn<String, String> fn) { - DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, timerId, mockArgumentProvider); + DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, "", mockArgumentProvider); } @Test @@ -889,7 +889,7 @@ public class DoFnInvokersTest { SimpleTimerDoFn fn = new SimpleTimerDoFn(); DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); - invoker.invokeOnTimer(timerId, timerId, mockArgumentProvider); + invoker.invokeOnTimer(timerId, "", mockArgumentProvider); assertThat(fn.status, equalTo("OK now")); } @@ -918,7 +918,7 @@ public class DoFnInvokersTest { SimpleTimerDoFn fn = new SimpleTimerDoFn(); DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn); - invoker.invokeOnTimer(timerId, timerId, mockArgumentProvider); + invoker.invokeOnTimer(timerId, "", mockArgumentProvider); assertThat(fn.window, equalTo(testWindow)); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index d8198e7..70d0d0a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -773,7 +773,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> { (Iterator<BoundedWindow>) timer.getWindows().iterator(); while (windowIterator.hasNext()) { currentWindow = windowIterator.next(); - doFnInvoker.invokeOnTimer(timerId, timerId, onTimerContext); + doFnInvoker.invokeOnTimer(timerId, "", onTimerContext); } } finally { currentTimer = null;