gemini-code-assist[bot] commented on code in PR #38450:
URL: https://github.com/apache/beam/pull/38450#discussion_r3221353487


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7501,6 +7501,91 @@ class BufferDoFn(DoFn):
 {{< code_sample "sdks/go/examples/snippets/04transforms.go" 
batching_dofn_example >}}
 {{< /highlight >}}
 
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future 
time, creating a loop. This is useful for producing periodic outputs or 
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow 
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain` 
parameter in the `@OnTimer` method to check if the timer firing was induced by 
a drain operation. **Note:** `CausedByDrain` will be set only in certain 
runners. Check the [capability 
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, 
KV<String, Integer>> {
+    @StateId("key") private final StateSpec<ValueState<String>> key = 
StateSpecs.value();
+    @TimerId("loopingTimer") private final TimerSpec loopingTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(
+        @Element KV<String, Integer> element,
+        @StateId("key") ValueState<String> keyState,
+        @TimerId("loopingTimer") Timer timer) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `process` method is missing the `OutputReceiver` parameter, which is 
required to call `output.output(element)` on line 7524. Adding it to the method 
signature will resolve this compilation error in the documentation example.
   
   ```suggestion
       public void process(
           @Element KV<String, Integer> element,
           @StateId("key") ValueState<String> keyState,
           @TimerId("loopingTimer") Timer timer,
           OutputReceiver<KV<String, Integer>> output) {
   ```



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7501,6 +7501,91 @@ class BufferDoFn(DoFn):
 {{< code_sample "sdks/go/examples/snippets/04transforms.go" 
batching_dofn_example >}}
 {{< /highlight >}}
 
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future 
time, creating a loop. This is useful for producing periodic outputs or 
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow 
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain` 
parameter in the `@OnTimer` method to check if the timer firing was induced by 
a drain operation. **Note:** `CausedByDrain` will be set only in certain 
runners. Check the [capability 
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, 
KV<String, Integer>> {
+    @StateId("key") private final StateSpec<ValueState<String>> key = 
StateSpecs.value();
+    @TimerId("loopingTimer") private final TimerSpec loopingTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(
+        @Element KV<String, Integer> element,
+        @StateId("key") ValueState<String> keyState,
+        @TimerId("loopingTimer") Timer timer) {
+      
+      keyState.write(element.getKey());
+      // Set initial timer
+      timer.offset(Duration.standardMinutes(1)).setRelative();
+      output.output(element);
+    }
+
+    @OnTimer("loopingTimer")
+    public void onTimer(
+        @StateId("key") ValueState<String> keyState,
+        @TimerId("loopingTimer") Timer timer,
+        OutputReceiver<KV<String, Integer>> output,
+        CausedByDrain drain) {
+
+      output.output(KV.of(keyState.read(), 0));
+
+      // Cancel looping timer if drain is in progress
+      if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+        return;
+      }
+
+      // Set next timer
+      timer.offset(Duration.standardMinutes(1)).setRelative();
+    }
+}
+{{< /highlight >}}
+
+{{< highlight py >}}
+# Python does not currently support detecting drain in OnTimer.
+# The following example demonstrates a looping timer without drain support.
+
+class LoopingTimerDoFn(DoFn):
+  KEY_STATE = ValueStateSpec('key', coders.StrUtf8Coder())
+  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+
+  def process(self, element, key_state=DoFn.StateParam(KEY_STATE), 
timer=DoFn.TimerParam(TIMER)):
+    key_state.write(element[0])
+    timer.set(Timestamp.now() + Duration(seconds=60))
+    yield element
+
+  @on_timer(TIMER)
+  def on_timer(self, key_state=DoFn.StateParam(KEY_STATE), 
timer=DoFn.TimerParam(TIMER)):
+    yield (key_state.read(), 0)
+    # Loops forever, cannot handle drain safely if it never stops.
+    timer.set(Timestamp.now() + Duration(seconds=60))
+{{< /highlight >}}
+
+{{< highlight go >}}
+// Go does not currently support detecting drain in OnTimer.
+// The following example demonstrates a looping timer without drain support.
+
+type LoopingTimerFn struct {
+       KeyState state.Value[string]
+       Timer    timers.EventTime

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   In the Go SDK, the `Set` method for `timers.EventTime` expects a 
`beam.Timestamp` (or `mtime.Time`), but `time.Now()` returns a `time.Time`. 
Since this example uses wall-clock time for the loop, it should use 
`timers.ProcessingTime` to ensure type compatibility and logical correctness 
for a heartbeat-style timer.
   
   ```suggestion
        Timer    timers.ProcessingTime
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to