boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428438971



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -962,16 +971,25 @@ private Progress getProgress() {
             .build());
   }
 
-  private <K> void processTimer(String timerId, TimeDomain timeDomain, 
Timer<K> timer) {
+  private <K> void processTimer(
+      String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
     currentTimer = timer;
     currentTimeDomain = timeDomain;
     onTimerContext = new OnTimerContext<>(timer.getUserKey());
+    String timerId =
+        timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
       Ack. Thanks!

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =
+          DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+    }
 
     @Override
-    public void set(String timerId, Instant absoluteTime) {}
+    public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
       I thought it would be good time to revisit the dynamic timer API design 
but it's not in the scope of this PR. Let's leave it as now. Thanks!

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -947,49 +910,213 @@ public void testTimers() throws Exception {
             timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)),
             timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)),
             timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)),
-            timerInGlobalWindow("B", new Instant(1900L), new 
Instant(10022L))));
+            timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)),
+            timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)),
+            timerInGlobalWindow("Y", new Instant(2100L), new 
Instant(10042L))));
+    assertThat(
+        fakeTimerClient.getTimers(eventFamilyTimer),
+        contains(
+            timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new 
Instant(1003L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new 
Instant(1103L)),
+            timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new 
Instant(1203L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new 
Instant(1303L)),
+            timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new 
Instant(2413L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new 
Instant(2513L)),
+            timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new 
Instant(2613L)),
+            timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new 
Instant(1723L)),
+            timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new 
Instant(1823L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new 
Instant(1923L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new 
Instant(2033L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new 
Instant(2143L))));
+    assertThat(
+        fakeTimerClient.getTimers(processingFamilyTimer),
+        contains(
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), 
new Instant(10004L)),
+            timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), 
new Instant(10004L)),
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), 
new Instant(10004L)),
+            timerInGlobalWindow("Y", "processing-timer1", new Instant(1300L), 
new Instant(10004L)),
+            timerInGlobalWindow("A", "processing-timer1", new Instant(1400L), 
new Instant(10014L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(1500L), 
new Instant(10014L)),
+            timerInGlobalWindow("A", "processing-timer1", new Instant(1600L), 
new Instant(10014L)),
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1700L), 
new Instant(10024L)),
+            timerInGlobalWindow("C", "processing-timer1", new Instant(1800L), 
new Instant(10024L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(1900L), 
new Instant(10024L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(2000L), 
new Instant(10034L)),
+            timerInGlobalWindow(
+                "Y", "processing-timer1", new Instant(2100L), new 
Instant(10044L))));
     mainOutputValues.clear();
 
     assertFalse(fakeTimerClient.isOutboundClosed(eventTimer));
     assertFalse(fakeTimerClient.isOutboundClosed(processingTimer));
+    assertFalse(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+    assertFalse(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
     fakeTimerClient.closeInbound(eventTimer);
     fakeTimerClient.closeInbound(processingTimer);
+    fakeTimerClient.closeInbound(eventFamilyTimer);
+    fakeTimerClient.closeInbound(processingFamilyTimer);
 
     Iterables.getOnlyElement(finishFunctionRegistry.getFunctions()).run();
     assertThat(mainOutputValues, empty());
 
     assertTrue(fakeTimerClient.isOutboundClosed(eventTimer));
     assertTrue(fakeTimerClient.isOutboundClosed(processingTimer));
+    assertTrue(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+    assertTrue(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
 
     Iterables.getOnlyElement(teardownFunctions).run();
     assertThat(mainOutputValues, empty());
 
     assertEquals(
         ImmutableMap.<StateKey, ByteString>builder()
             .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2", 
"processing"))
-            .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
+            .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2", 
"processing-family"))
             .put(bagUserStateKey("bag", "A"), encode("A0", "event", "event"))
-            .put(bagUserStateKey("bag", "B"), encode("event", "processing"))
+            .put(bagUserStateKey("bag", "B"), encode("event", "processing", 
"event-family"))
             .put(bagUserStateKey("bag", "C"), encode("C0", "processing"))
             .build(),
         fakeStateClient.getData());
   }
 
+  private <K> org.apache.beam.runners.core.construction.Timer<K> 
timerInGlobalWindow(
+      K userKey, Instant holdTimestamp, Instant fireTimestamp) {
+    return timerInGlobalWindow(userKey, "", holdTimestamp, fireTimestamp);
+  }
+
   private <T> WindowedValue<T> valueInWindow(T value, BoundedWindow window) {
     return WindowedValue.of(value, window.maxTimestamp(), window, 
PaneInfo.NO_FIRING);
   }
 
   private <K> org.apache.beam.runners.core.construction.Timer<K> 
timerInGlobalWindow(

Review comment:
       dynamicTimerInGlobalWindow?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to