gemini-code-assist[bot] commented on code in PR #38450:
URL: https://github.com/apache/beam/pull/38450#discussion_r3221353487
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7501,6 +7501,91 @@ class BufferDoFn(DoFn):
{{< code_sample "sdks/go/examples/snippets/04transforms.go"
batching_dofn_example >}}
{{< /highlight >}}
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future
time, creating a loop. This is useful for producing periodic outputs or
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain`
parameter in the `@OnTimer` method to check if the timer firing was induced by
a drain operation. **Note:** `CausedByDrain` will be set only in certain
runners. Check the [capability
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>,
KV<String, Integer>> {
+ @StateId("key") private final StateSpec<ValueState<String>> key =
StateSpecs.value();
+ @TimerId("loopingTimer") private final TimerSpec loopingTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(
+ @Element KV<String, Integer> element,
+ @StateId("key") ValueState<String> keyState,
+ @TimerId("loopingTimer") Timer timer) {
Review Comment:

The `process` method is missing the `OutputReceiver` parameter, which is
required to call `output.output(element)` on line 7524. Adding it to the method
signature will resolve this compilation error in the documentation example.
```suggestion
public void process(
@Element KV<String, Integer> element,
@StateId("key") ValueState<String> keyState,
@TimerId("loopingTimer") Timer timer,
OutputReceiver<KV<String, Integer>> output) {
```
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7501,6 +7501,91 @@ class BufferDoFn(DoFn):
{{< code_sample "sdks/go/examples/snippets/04transforms.go"
batching_dofn_example >}}
{{< /highlight >}}
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future
time, creating a loop. This is useful for producing periodic outputs or
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain`
parameter in the `@OnTimer` method to check if the timer firing was induced by
a drain operation. **Note:** `CausedByDrain` will be set only in certain
runners. Check the [capability
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>,
KV<String, Integer>> {
+ @StateId("key") private final StateSpec<ValueState<String>> key =
StateSpecs.value();
+ @TimerId("loopingTimer") private final TimerSpec loopingTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(
+ @Element KV<String, Integer> element,
+ @StateId("key") ValueState<String> keyState,
+ @TimerId("loopingTimer") Timer timer) {
+
+ keyState.write(element.getKey());
+ // Set initial timer
+ timer.offset(Duration.standardMinutes(1)).setRelative();
+ output.output(element);
+ }
+
+ @OnTimer("loopingTimer")
+ public void onTimer(
+ @StateId("key") ValueState<String> keyState,
+ @TimerId("loopingTimer") Timer timer,
+ OutputReceiver<KV<String, Integer>> output,
+ CausedByDrain drain) {
+
+ output.output(KV.of(keyState.read(), 0));
+
+ // Cancel looping timer if drain is in progress
+ if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+ return;
+ }
+
+ // Set next timer
+ timer.offset(Duration.standardMinutes(1)).setRelative();
+ }
+}
+{{< /highlight >}}
+
+{{< highlight py >}}
+# Python does not currently support detecting drain in OnTimer.
+# The following example demonstrates a looping timer without drain support.
+
+class LoopingTimerDoFn(DoFn):
+ KEY_STATE = ValueStateSpec('key', coders.StrUtf8Coder())
+ TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+
+ def process(self, element, key_state=DoFn.StateParam(KEY_STATE),
timer=DoFn.TimerParam(TIMER)):
+ key_state.write(element[0])
+ timer.set(Timestamp.now() + Duration(seconds=60))
+ yield element
+
+ @on_timer(TIMER)
+ def on_timer(self, key_state=DoFn.StateParam(KEY_STATE),
timer=DoFn.TimerParam(TIMER)):
+ yield (key_state.read(), 0)
+ # Loops forever, cannot handle drain safely if it never stops.
+ timer.set(Timestamp.now() + Duration(seconds=60))
+{{< /highlight >}}
+
+{{< highlight go >}}
+// Go does not currently support detecting drain in OnTimer.
+// The following example demonstrates a looping timer without drain support.
+
+type LoopingTimerFn struct {
+ KeyState state.Value[string]
+ Timer timers.EventTime
Review Comment:

In the Go SDK, the `Set` method for `timers.EventTime` expects a
`beam.Timestamp` (or `mtime.Time`), but `time.Now()` returns a `time.Time`.
Since this example uses wall-clock time for the loop, it should use
`timers.ProcessingTime` to ensure type compatibility and logical correctness
for a heartbeat-style timer.
```suggestion
Timer timers.ProcessingTime
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]