This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7719708  Merge pull request #10804: [BEAM-2535] Fix timer map
7719708 is described below

commit 7719708a04d5d0eff3048dbd58ac1337889f8ba5
Author: reuvenlax <re...@google.com>
AuthorDate: Sat Feb 8 22:57:17 2020 -0800

    Merge pull request #10804: [BEAM-2535] Fix timer map
---
 .../beam/runners/core/SimpleDoFnRunnerTest.java    |   2 +-
 runners/google-cloud-dataflow-java/build.gradle    |   1 -
 .../runners/dataflow/worker/SimpleParDoFn.java     |   3 +-
 .../translation/batch/ParDoTranslatorBatch.java    |   4 +-
 .../spark/translation/TranslationUtils.java        |   3 +-
 .../java/org/apache/beam/sdk/transforms/ParDo.java |   3 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java       | 114 ++++++---------------
 .../sdk/transforms/reflect/DoFnSignatures.java     |   3 +
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  91 +++++++++++++++-
 .../sdk/transforms/reflect/DoFnInvokersTest.java   |   6 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |   2 +-
 11 files changed, 133 insertions(+), 99 deletions(-)

diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index f3f1628..64a2d4d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -120,7 +120,7 @@ public class SimpleDoFnRunnerTest {
 
     runner.onTimer(
         ThrowingDoFn.TIMER_ID,
-        ThrowingDoFn.TIMER_ID,
+        "",
         GlobalWindow.INSTANCE,
         new Instant(0),
         new Instant(0),
diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 8eb9808..3a16f7a 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -137,7 +137,6 @@ def commonExcludeCategories = [
   'org.apache.beam.sdk.testing.UsesGaugeMetrics',
   'org.apache.beam.sdk.testing.UsesSetState',
   'org.apache.beam.sdk.testing.UsesMapState',
-  'org.apache.beam.sdk.testing.UsesTimerMap',
   'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
   'org.apache.beam.sdk.testing.UsesUnboundedPCollections',
   'org.apache.beam.sdk.testing.UsesTestStream',
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 4c011ed..76ea9d0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -355,7 +355,8 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
   }
 
   private void processUserTimer(TimerData timer) throws Exception {
-    if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) {
+    if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
+        || 
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
       BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
       fnRunner.onTimer(
           timer.getTimerId(),
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b403248..3509126 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -79,7 +79,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
     // TODO: add support of states and timers
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     boolean stateful =
-        signature.stateDeclarations().size() > 0 || 
signature.timerDeclarations().size() > 0;
+        signature.stateDeclarations().size() > 0
+            || signature.timerDeclarations().size() > 0
+            || signature.timerFamilyDeclarations().size() > 0;
     checkState(!stateful, "States and timers are not supported for the 
moment.");
 
     DoFnSchemaInformation doFnSchemaInformation =
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 16a4ca9..2b15639 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -273,7 +273,8 @@ public final class TranslationUtils {
               SparkRunner.class.getSimpleName()));
     }
 
-    if (signature.timerDeclarations().size() > 0) {
+    if (signature.timerDeclarations().size() > 0
+        || signature.timerFamilyDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
               "Found %s annotations on %s, but %s cannot yet be used with 
timers in the %s.",
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index bca2eb4..29f4daf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -583,7 +583,8 @@ public class ParDo {
     }
 
     // Timers are semantically incompatible with splitting
-    if (!signature.timerDeclarations().isEmpty() && 
signature.processElement().isSplittable()) {
+    if ((!signature.timerDeclarations().isEmpty() || 
!signature.timerFamilyDeclarations().isEmpty())
+        && signature.processElement().isSplittable()) {
       throw new UnsupportedOperationException(
           String.format(
               "%s is splittable and uses timers, but these are not compatible",
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 5fd2a9a..1e0916b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -24,7 +24,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -96,6 +95,7 @@ import 
org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.MethodVisit
 import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Opcodes;
 import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Type;
 import 
org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
 
 /** Dynamically generates a {@link DoFnInvoker} instances for invoking a 
{@link DoFn}. */
@@ -172,7 +172,8 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
       implements DoFnInvoker<InputT, OutputT> {
     protected DoFnT delegate;
 
-    private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();
+    private Map<String, OnTimerInvoker> onTimerInvokers = Maps.newHashMap();
+    private Map<String, OnTimerInvoker> onTimerFamilyInvokers = 
Maps.newHashMap();
 
     public DoFnInvokerBase(DoFnT delegate) {
       this.delegate = delegate;
@@ -191,49 +192,6 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
       this.onTimerInvokers.put(timerId, onTimerInvoker);
     }
 
-    @Override
-    public void invokeOnTimer(
-        String timerId,
-        String timerFamilyId,
-        DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
-      @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);
-
-      if (onTimerInvoker != null) {
-        onTimerInvoker.invokeOnTimer(arguments);
-      } else {
-        throw new IllegalArgumentException(
-            String.format(
-                "Attempted to invoke timer %s on %s, but that timer is not 
registered."
-                    + " This is the responsibility of the runner, which must 
only deliver"
-                    + " registered timers.",
-                timerId, delegate.getClass().getName()));
-      }
-    }
-
-    @Override
-    public DoFn<InputT, OutputT> getFn() {
-      return delegate;
-    }
-  }
-
-  /**
-   * Internal base class for generated {@link DoFnInvoker} instances.
-   *
-   * <p>This class should <i>not</i> be extended directly, or by Beam users. 
It must be public for
-   * generated instances to have adequate access, as they are generated 
"inside" the invoked {@link
-   * DoFn} class.
-   */
-  public abstract static class DoFnInvokerTimerFamily<
-          InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>>
-      implements DoFnInvoker<InputT, OutputT> {
-    protected DoFnT delegate;
-
-    private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();
-
-    public DoFnInvokerTimerFamily(DoFnT delegate) {
-      this.delegate = delegate;
-    }
-
     /**
      * Associates the given timerFamily ID with the given {@link 
OnTimerInvoker}.
      *
@@ -241,7 +199,7 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
      * timer ID rather than a generated conditional branch to choose which 
OnTimerInvoker to invoke.
      */
     void addOnTimerFamilyInvoker(String timerFamilyId, OnTimerInvoker 
onTimerInvoker) {
-      this.onTimerInvokers.put(timerFamilyId, onTimerInvoker);
+      this.onTimerFamilyInvokers.put(timerFamilyId, onTimerInvoker);
     }
 
     @Override
@@ -249,17 +207,21 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
         String timerId,
         String timerFamilyId,
         DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
-      @Nullable OnTimerInvoker onTimerInvoker = 
onTimerInvokers.get(timerFamilyId);
+      @Nullable
+      OnTimerInvoker onTimerInvoker =
+          (timerFamilyId.isEmpty())
+              ? onTimerInvokers.get(timerId)
+              : onTimerFamilyInvokers.get(timerFamilyId);
 
       if (onTimerInvoker != null) {
         onTimerInvoker.invokeOnTimer(arguments);
       } else {
         throw new IllegalArgumentException(
             String.format(
-                "Attempted to invoke timerFamily %s on %s, but that 
timerFamily is not registered."
+                "Attempted to invoke timer %s on %s, but that timer is not 
registered."
                     + " This is the responsibility of the runner, which must 
only deliver"
                     + " registered timers.",
-                timerFamilyId, delegate.getClass().getName()));
+                timerId, delegate.getClass().getName()));
       }
     }
 
@@ -279,33 +241,21 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
         fn.getClass());
 
     try {
-      if (signature.timerFamilyDeclarations().size() > 0) {
-        @SuppressWarnings("unchecked")
-        DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>> invoker 
=
-            (DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>>)
-                getByteBuddyInvokerConstructor(signature).newInstance(fn);
-
-        for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
-            signature.onTimerFamilyMethods().values()) {
-          invoker.addOnTimerFamilyInvoker(
-              onTimerFamilyMethod.id(),
-              OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id()));
-        }
-        return invoker;
-      } else {
-
-        @SuppressWarnings("unchecked")
-        DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
-            (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
-                getByteBuddyInvokerConstructor(signature).newInstance(fn);
-
-        for (OnTimerMethod onTimerMethod : 
signature.onTimerMethods().values()) {
-          invoker.addOnTimerInvoker(
-              onTimerMethod.id(), OnTimerInvokers.forTimer(fn, 
onTimerMethod.id()));
-        }
-        return invoker;
+      @SuppressWarnings("unchecked")
+      DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
+          (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
+              getByteBuddyInvokerConstructor(signature).newInstance(fn);
+
+      for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
+        invoker.addOnTimerInvoker(
+            onTimerMethod.id(), OnTimerInvokers.forTimer(fn, 
onTimerMethod.id()));
       }
-
+      for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
+          signature.onTimerFamilyMethods().values()) {
+        invoker.addOnTimerFamilyInvoker(
+            onTimerFamilyMethod.id(), OnTimerInvokers.forTimerFamily(fn, 
onTimerFamilyMethod.id()));
+      }
+      return invoker;
     } catch (InstantiationException
         | IllegalAccessException
         | IllegalArgumentException
@@ -325,12 +275,7 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
     Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
     if (constructor == null) {
-      Class<? extends DoFnInvoker<?, ?>> invokerClass =
-          generateInvokerClass(
-              signature,
-              signature.timerFamilyDeclarations().size() > 0
-                  ? DoFnInvokerTimerFamily.class
-                  : DoFnInvokerBase.class);
+      Class<? extends DoFnInvoker<?, ?>> invokerClass = 
generateInvokerClass(signature);
       try {
         constructor = invokerClass.getConstructor(fnClass);
       } catch (IllegalArgumentException | NoSuchMethodException | 
SecurityException e) {
@@ -391,8 +336,7 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
   }
 
   /** Generates a {@link DoFnInvoker} class for the given {@link 
DoFnSignature}. */
-  private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(
-      DoFnSignature signature, Class<? extends DoFnInvoker> clazz) {
+  private static Class<? extends DoFnInvoker<?, ?>> 
generateInvokerClass(DoFnSignature signature) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
 
     final TypeDescription clazzDescription = new 
TypeDescription.ForLoadedType(fnClass);
@@ -406,12 +350,12 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
                     .withSuffix(DoFnInvoker.class.getSimpleName()))
 
             // class <invoker class> extends DoFnInvokerBase {
-            .subclass(clazz, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+            .subclass(DoFnInvokerBase.class, 
ConstructorStrategy.Default.NO_CONSTRUCTORS)
 
             //   public <invoker class>(<fn class> delegate) { this.delegate = 
delegate; }
             .defineConstructor(Visibility.PUBLIC)
             .withParameter(fnClass)
-            .intercept(new InvokerConstructor(clazz))
+            .intercept(new InvokerConstructor(DoFnInvokerBase.class))
 
             //   public invokeProcessElement(ProcessContext, 
ExtraContextFactory) {
             //     delegate.<@ProcessElement>(... pass just the right args 
...);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 23c7209..48f5adf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1559,6 +1559,9 @@ public class DoFnSignatures {
       Map<String, TimerFamilyDeclaration> declarations,
       String id,
       Field field) {
+    if (id.isEmpty()) {
+      errors.throwIllegalArgument("TimerFamily id must not be empty");
+    }
 
     if (declarations.containsKey(id)) {
       errors.throwIllegalArgument(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index deca9b7..65f7003 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -118,6 +118,7 @@ import 
org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
@@ -4482,7 +4483,17 @@ public class ParDoTest implements Serializable {
 
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
-    public void testTimerFamilyEventTime() throws Exception {
+    public void testTimerFamilyEventTimeBounded() throws Exception {
+      runTestTimerFamilyEventTime(false);
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
+    public void testTimerFamilyEventTimeUnbounded() throws Exception {
+      runTestTimerFamilyEventTime(true);
+    }
+
+    public void runTestTimerFamilyEventTime(boolean useStreaming) {
       final String timerFamilyId = "foo";
 
       DoFn<KV<String, Integer>, String> fn =
@@ -4512,14 +4523,27 @@ public class ParDoTest implements Serializable {
           };
 
       PCollection<String> output =
-          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+          pipeline
+              .apply(Create.of(KV.of("hello", 37)))
+              .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : 
IsBounded.BOUNDED)
+              .apply(ParDo.of(fn));
       PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2");
       pipeline.run();
     }
 
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
-    public void testTimerWithMultipleTimerFamily() throws Exception {
+    public void testTimerWithMultipleTimerFamilyBounded() throws Exception {
+      runTestTimerWithMultipleTimerFamily(false);
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
+    public void testTimerWithMultipleTimerFamilyUnbounded() throws Exception {
+      runTestTimerWithMultipleTimerFamily(true);
+    }
+
+    public void runTestTimerWithMultipleTimerFamily(boolean useStreaming) 
throws Exception {
       final String timerFamilyId1 = "foo";
       final String timerFamilyId2 = "bar";
 
@@ -4556,12 +4580,71 @@ public class ParDoTest implements Serializable {
           };
 
       PCollection<String> output =
-          pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+          pipeline
+              .apply(Create.of(KV.of("hello", 37)))
+              .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : 
IsBounded.BOUNDED)
+              .apply(ParDo.of(fn));
       PAssert.that(output).containsInAnyOrder("process", "timer", "timer");
       pipeline.run();
     }
 
     @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
+    public void testTimerFamilyAndTimerBounded() throws Exception {
+      runTestTimerFamilyAndTimer(false);
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTimerMap.class})
+    public void testTimerFamilyAndTimerUnbounded() throws Exception {
+      runTestTimerFamilyAndTimer(true);
+    }
+
+    public void runTestTimerFamilyAndTimer(boolean useStreaming) throws 
Exception {
+      final String timerFamilyId = "foo";
+      final String timerId = "timer";
+
+      DoFn<KV<String, Integer>, String> fn =
+          new DoFn<KV<String, Integer>, String>() {
+
+            @TimerFamily(timerFamilyId)
+            private final TimerSpec spec1 = 
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+            @TimerId(timerId)
+            private final TimerSpec spec2 = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerFamily(timerFamilyId) TimerMap timerMap,
+                @TimerId(timerId) Timer timer,
+                OutputReceiver<String> r) {
+              timerMap.set("timer", new Instant(1));
+              timer.set(new Instant(2));
+              r.output("process");
+            }
+
+            @OnTimerFamily(timerFamilyId)
+            public void onTimer1(
+                @TimerId String timerId, @Timestamp Instant ts, 
OutputReceiver<String> r) {
+              r.output("family:" + timerFamilyId + ":" + timerId);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer2(@Timestamp Instant ts, OutputReceiver<String> 
r) {
+              r.output(timerId);
+            }
+          };
+
+      PCollection<String> output =
+          pipeline
+              .apply(Create.of(KV.of("hello", 37)))
+              .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : 
IsBounded.BOUNDED)
+              .apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder("process", "family:foo:timer", 
"timer");
+      pipeline.run();
+    }
+
+    @Test
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index fa45c20..51d0d173 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -114,7 +114,7 @@ public class DoFnInvokersTest {
   }
 
   private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
-    DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, timerId, 
mockArgumentProvider);
+    DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, "", 
mockArgumentProvider);
   }
 
   @Test
@@ -889,7 +889,7 @@ public class DoFnInvokersTest {
     SimpleTimerDoFn fn = new SimpleTimerDoFn();
 
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
-    invoker.invokeOnTimer(timerId, timerId, mockArgumentProvider);
+    invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
     assertThat(fn.status, equalTo("OK now"));
   }
 
@@ -918,7 +918,7 @@ public class DoFnInvokersTest {
     SimpleTimerDoFn fn = new SimpleTimerDoFn();
 
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
-    invoker.invokeOnTimer(timerId, timerId, mockArgumentProvider);
+    invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
     assertThat(fn.window, equalTo(testWindow));
   }
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index d8198e7..70d0d0a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -773,7 +773,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, OutputT> {
           (Iterator<BoundedWindow>) timer.getWindows().iterator();
       while (windowIterator.hasNext()) {
         currentWindow = windowIterator.next();
-        doFnInvoker.invokeOnTimer(timerId, timerId, onTimerContext);
+        doFnInvoker.invokeOnTimer(timerId, "", onTimerContext);
       }
     } finally {
       currentTimer = null;

Reply via email to