boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427720451



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -962,16 +971,25 @@ private Progress getProgress() {
             .build());
   }
 
-  private <K> void processTimer(String timerId, TimeDomain timeDomain, 
Timer<K> timer) {
+  private <K> void processTimer(
+      String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
     currentTimer = timer;
     currentTimeDomain = timeDomain;
     onTimerContext = new OnTimerContext<>(timer.getUserKey());
+    String timerId =
+        timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
       If the `timerIdOrTimerFamilyId ` is for a timer family, should the 
timerId be the `timer.dynamicTimerTag`?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =

Review comment:
       Similar to `FnApiTimer` above, we should have `timeDomain` from proto, 
right?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
       // Extract out relevant TimerFamilySpec information in preparation for 
execution.
       for (Map.Entry<String, TimerFamilySpec> entry :
           parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-        String timerFamilyId = entry.getKey();
-        TimeDomain timeDomain =
-            DoFnSignatures.getTimerSpecOrThrow(
-                    doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-                .getTimeDomain();
+        String timerIdOrTimerFamilyId = entry.getKey();
+        TimeDomain timeDomain;
+        if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+          timeDomain =

Review comment:
       The `TTimerFamilySpec` should have `time_domain ` field. Maybe we could  
do something similar to 
https://github.com/apache/beam/blob/1de50c348706ed25af2bab9c9477d7d4f36ef8bf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L657-L668

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
       // Extract out relevant TimerFamilySpec information in preparation for 
execution.
       for (Map.Entry<String, TimerFamilySpec> entry :
           parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-        String timerFamilyId = entry.getKey();
-        TimeDomain timeDomain =
-            DoFnSignatures.getTimerSpecOrThrow(
-                    doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-                .getTimeDomain();
+        String timerIdOrTimerFamilyId = entry.getKey();
+        TimeDomain timeDomain;
+        if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+          timeDomain =
+              DoFnSignatures.getTimerFamilySpecOrThrow(
+                      
doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn)
+                  .getTimeDomain();
+        } else {
+          timeDomain =
+              DoFnSignatures.getTimerSpecOrThrow(
+                      
doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn)
+                  .getTimeDomain();
+        }
         Coder<Timer<Object>> timerCoder =
             (Coder) 
rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
-        timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, 
timerCoder));
+        timerFamilyInfosBuilder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, 
timerCoder));

Review comment:
       Could you please add more javadoc/comments about why it's `timerId` or 
`timerFamilyId`?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =
+          DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+    }
 
     @Override
-    public void set(String timerId, Instant absoluteTime) {}
+    public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
       We should consider exposing more APIs here like `FnApiTimer`. Maybe a 
TODO here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to