gemini-code-assist[bot] commented on code in PR #37937:
URL: https://github.com/apache/beam/pull/37937#discussion_r3081040861
##########
.agent/skills/beam-dofn-modernizer/SKILL.md:
##########
@@ -0,0 +1,190 @@
+---
+name: beam-dofn-modernizer
+description: Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer,
@OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage.
Use this skill when you encounter DoFn methods that use context.element(),
context.output(), etc., and need to modernization them using parameter
injection (@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
+---
+
+# Modernizing Apache Beam DoFns
+
+Apache Beam has moved towards parameter injection in `DoFn` methods to improve
readability and allow for more efficient execution. This skill helps you
migrate legacy `ProcessContext` and `OnTimerContext` usage to modern annotated
parameters.
+
+## Core Mappings
+
+When rewriting a `@ProcessElement` or `@OnTimer` method, replace the context
argument with the corresponding parameters based on the usage:
+
+| Legacy Context Usage (e.g. `ProcessContext c`) | Modern Parameter
Replacement |
+| :--- | :--- |
+| `c.element()` | `@Element T element` |
+| `c.timestamp()` | `@Timestamp Instant timestamp` |
+| `c.pane()` | `PaneInfo pane` |
+| `c.window()` | `BoundedWindow window` |
+| `c.sideInput(PCollectionView<T> view)` | `@SideInput("viewName") T value` |
+| `c.getPipelineOptions()` | `PipelineOptions options` |
+| `c.output(value)` | `OutputReceiver<T> receiver` then
`receiver.output(value)` |
+| `c.output(tag, value)` | `MultiOutputReceiver receiver` then
`receiver.get(tag).output(value)` |
+| `c.outputWithTimestamp(value, ts)` | `OutputReceiver<T> receiver` then
`receiver.outputWithTimestamp(value, ts)` |
+
+## Method Signature Changes
+
+### @ProcessElement
+
+**Legacy:**
+```java
+@ProcessElement
+public void processElement(ProcessContext c) {
+ T element = c.element();
+ c.output(transform(element));
+}
+```
+
+**Modern:**
+```java
+@ProcessElement
+public void processElement(
+ @Element T element,
+ @Timestamp Instant timestamp,
+ OutputReceiver<V> receiver) {
+ receiver.output(transform(element));
+}
+```
+
+### @OnTimer
+
+**Legacy:**
+```java
+@OnTimer("timerId")
+public void onTimer(OnTimerContext c) {
+ c.output(someValue);
+}
+```
+
+**Modern:**
+```java
+@OnTimer("timerId")
+public void onTimer(
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ OutputReceiver<V> receiver) {
+ receiver.output(someValue);
+}
+```
+
+## Best Practices
+
+1. **Specific OutputReceiver**: If the method only outputs to the main
output, use `OutputReceiver<T>`. If it outputs to multiple tags, use
`MultiOutputReceiver`.
+2. **Element Type**: Ensure the `@Element` parameter type matches the input
type of the `DoFn`.
+3. **Imports**: Don't forget to add imports for:
+ * `org.apache.beam.sdk.transforms.DoFn.Element`
+ * `org.apache.beam.sdk.transforms.DoFn.Timestamp`
+ * `org.apache.beam.sdk.transforms.DoFn.OutputReceiver`
+ * `org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver` (if needed)
+ * `org.apache.beam.sdk.values.PCollectionView` (if using `@SideInput`)
+ * `org.apache.beam.sdk.transforms.DoFn.SideInput`
+ *. `org.apache.beam.sdk.transforms.windowing.PaneInfo`
+4. **Side Inputs**: When using `@SideInput`, make sure to use the correct
name that matches the one passed to `ParDo.withSideInput("name", view)`.
+
+## Example Conversion
+
+### Before:
+```java
+@ProcessElement
+public void processElement(ProcessContext c) {
+ KV<String, Integer> element = c.element();
+ Instant ts = c.timestamp();
+ if (element.getValue() > threshold) {
+ c.output(element.getKey());
+ c.output(specialTag, element.getValue());
+ }
+}
+```
+
+### After:
+```java
+@ProcessElement
+public void processElement(
+ @Element KV<String, Integer> element,
+ @Timestamp Instant timestamp,
+ MultiOutputReceiver receiver) {
+ if (element.getValue() > threshold) {
+ receiver.get(mainTag).output(element.getKey());
+ receiver.get(specialTag).output(element.getValue());
+ }
+}
+```
+> [!NOTE]
+> If you only have one output, use `OutputReceiver<String> receiver` and
`receiver.output(element.getKey())`.
+
+## Side Input Conversion
+
+Modernizing side inputs involves removing the `PCollectionView` from the
`DoFn` constructor and using `@SideInput` parameter injection instead.
+
+### Before (Legacy):
+
+**PTransform/Pipeline side:**
+```java
+PCollectionView<String> myView = ...;
+input.apply(ParDo.of(new MyFn(myView)).withSideInputs(myView));
+```
+
+**DoFn side:**
+```java
+class MyFn extends DoFn<T, V> {
+ private final PCollectionView<String> view;
+ MyFn(PCollectionView<String> view) { this.view = view; }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String value = c.sideInput(view);
+ // ...
+ }
+}
+```
+
+### Nullable Side Inputs
+
+If pCollection argument is nullable (can be marked with nullable annotation
but not always) and ProcessElement has logic that depends on it, then raise it
as a blocking issue for modernization of this pattern and create two DoFns one
with side input and one without side input.
+
+**PTransform/Pipeline side:**
+```java
+PCollectionView<String> myView = ...;
+input.apply(ParDo.of(new MyFn(myView)).withSideInputs(myView));
+//or
+input.apply(ParDo.of(new MyFn(myView)); // to indroduce null
Review Comment:

There's a typo here: `indroduce` should be `introduce`. Also, the code
example is syntactically incorrect and misleading. To demonstrate passing a
null side input, you should pass `null` to the `DoFn`'s constructor, for
example: `input.apply(ParDo.of(new MyFn(null)));`.
```suggestion
input.apply(ParDo.of(new MyFn(null))); // to introduce null
```
##########
.agent/skills/beam-dofn-modernizer/SKILL.md:
##########
@@ -0,0 +1,190 @@
+---
+name: beam-dofn-modernizer
+description: Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer,
@OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage.
Use this skill when you encounter DoFn methods that use context.element(),
context.output(), etc., and need to modernization them using parameter
injection (@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
Review Comment:

There's a grammatical error here. It should be `modernize` instead of
`modernization`.
```suggestion
description: Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer,
@OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage.
Use this skill when you encounter DoFn methods that use context.element(),
context.output(), etc., and need to modernize them using parameter injection
(@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
```
##########
.agent/skills/beam-dofn-modernizer/SKILL.md:
##########
@@ -0,0 +1,190 @@
+---
+name: beam-dofn-modernizer
+description: Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer,
@OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage.
Use this skill when you encounter DoFn methods that use context.element(),
context.output(), etc., and need to modernization them using parameter
injection (@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
+---
+
+# Modernizing Apache Beam DoFns
+
+Apache Beam has moved towards parameter injection in `DoFn` methods to improve
readability and allow for more efficient execution. This skill helps you
migrate legacy `ProcessContext` and `OnTimerContext` usage to modern annotated
parameters.
+
+## Core Mappings
+
+When rewriting a `@ProcessElement` or `@OnTimer` method, replace the context
argument with the corresponding parameters based on the usage:
+
+| Legacy Context Usage (e.g. `ProcessContext c`) | Modern Parameter
Replacement |
+| :--- | :--- |
+| `c.element()` | `@Element T element` |
+| `c.timestamp()` | `@Timestamp Instant timestamp` |
+| `c.pane()` | `PaneInfo pane` |
+| `c.window()` | `BoundedWindow window` |
+| `c.sideInput(PCollectionView<T> view)` | `@SideInput("viewName") T value` |
+| `c.getPipelineOptions()` | `PipelineOptions options` |
+| `c.output(value)` | `OutputReceiver<T> receiver` then
`receiver.output(value)` |
+| `c.output(tag, value)` | `MultiOutputReceiver receiver` then
`receiver.get(tag).output(value)` |
+| `c.outputWithTimestamp(value, ts)` | `OutputReceiver<T> receiver` then
`receiver.outputWithTimestamp(value, ts)` |
+
+## Method Signature Changes
+
+### @ProcessElement
+
+**Legacy:**
+```java
+@ProcessElement
+public void processElement(ProcessContext c) {
+ T element = c.element();
+ c.output(transform(element));
+}
+```
+
+**Modern:**
+```java
+@ProcessElement
+public void processElement(
+ @Element T element,
+ @Timestamp Instant timestamp,
+ OutputReceiver<V> receiver) {
+ receiver.output(transform(element));
+}
+```
+
+### @OnTimer
+
+**Legacy:**
+```java
+@OnTimer("timerId")
+public void onTimer(OnTimerContext c) {
+ c.output(someValue);
+}
+```
+
+**Modern:**
+```java
+@OnTimer("timerId")
+public void onTimer(
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ OutputReceiver<V> receiver) {
+ receiver.output(someValue);
+}
+```
+
+## Best Practices
+
+1. **Specific OutputReceiver**: If the method only outputs to the main
output, use `OutputReceiver<T>`. If it outputs to multiple tags, use
`MultiOutputReceiver`.
+2. **Element Type**: Ensure the `@Element` parameter type matches the input
type of the `DoFn`.
+3. **Imports**: Don't forget to add imports for:
+ * `org.apache.beam.sdk.transforms.DoFn.Element`
+ * `org.apache.beam.sdk.transforms.DoFn.Timestamp`
+ * `org.apache.beam.sdk.transforms.DoFn.OutputReceiver`
+ * `org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver` (if needed)
+ * `org.apache.beam.sdk.values.PCollectionView` (if using `@SideInput`)
+ * `org.apache.beam.sdk.transforms.DoFn.SideInput`
+ *. `org.apache.beam.sdk.transforms.windowing.PaneInfo`
Review Comment:

There's a typo in the list formatting. `*.` should be `*`.
```suggestion
* `org.apache.beam.sdk.transforms.windowing.PaneInfo`
```
##########
examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java:
##########
@@ -74,9 +76,9 @@ private static Row flattenAnalyticsRow(Row row) {
static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String,
Long>> {
@ProcessElement
- public void processElement(ProcessContext c) {
- Row row = c.element();
- c.output(
+ public void processElement(@Element Row element, OutputReceiver<KV<String,
Long>> receiver) {
+ Row row = element;
Review Comment:

The local variable `row` is redundant. You can directly use the `element`
parameter to improve code conciseness.
##########
examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java:
##########
@@ -540,14 +544,15 @@ public static PCollection<String> coGroupByKeyTuple(
ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- KV<String, CoGbkResult> e = c.element();
+ public void processElement(
+ @Element KV<String, CoGbkResult> element,
OutputReceiver<String> receiver) {
+ KV<String, CoGbkResult> e = element;
Review Comment:

The local variable `e` is redundant. You can use the `element` parameter
directly.
##########
.agent/skills/beam-dofn-modernizer/SKILL.md:
##########
@@ -0,0 +1,190 @@
+---
+name: beam-dofn-modernizer
+description: Rewrite Apache Beam DoFn methods (@ProcessElement, @OnTimer,
@OnWindowExpiration) to remove legacy ProcessContext or OnTimerContext usage.
Use this skill when you encounter DoFn methods that use context.element(),
context.output(), etc., and need to modernization them using parameter
injection (@Element, @Timestamp, @Pane, OutputReceiver, MultiOutputReceiver).
+---
+
+# Modernizing Apache Beam DoFns
+
+Apache Beam has moved towards parameter injection in `DoFn` methods to improve
readability and allow for more efficient execution. This skill helps you
migrate legacy `ProcessContext` and `OnTimerContext` usage to modern annotated
parameters.
+
+## Core Mappings
+
+When rewriting a `@ProcessElement` or `@OnTimer` method, replace the context
argument with the corresponding parameters based on the usage:
+
+| Legacy Context Usage (e.g. `ProcessContext c`) | Modern Parameter
Replacement |
+| :--- | :--- |
+| `c.element()` | `@Element T element` |
+| `c.timestamp()` | `@Timestamp Instant timestamp` |
+| `c.pane()` | `PaneInfo pane` |
+| `c.window()` | `BoundedWindow window` |
+| `c.sideInput(PCollectionView<T> view)` | `@SideInput("viewName") T value` |
+| `c.getPipelineOptions()` | `PipelineOptions options` |
+| `c.output(value)` | `OutputReceiver<T> receiver` then
`receiver.output(value)` |
+| `c.output(tag, value)` | `MultiOutputReceiver receiver` then
`receiver.get(tag).output(value)` |
+| `c.outputWithTimestamp(value, ts)` | `OutputReceiver<T> receiver` then
`receiver.outputWithTimestamp(value, ts)` |
+
+## Method Signature Changes
+
+### @ProcessElement
+
+**Legacy:**
+```java
+@ProcessElement
+public void processElement(ProcessContext c) {
+ T element = c.element();
+ c.output(transform(element));
+}
+```
+
+**Modern:**
+```java
+@ProcessElement
+public void processElement(
+ @Element T element,
+ @Timestamp Instant timestamp,
+ OutputReceiver<V> receiver) {
+ receiver.output(transform(element));
+}
+```
+
+### @OnTimer
+
+**Legacy:**
+```java
+@OnTimer("timerId")
+public void onTimer(OnTimerContext c) {
+ c.output(someValue);
+}
+```
+
+**Modern:**
+```java
+@OnTimer("timerId")
+public void onTimer(
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ OutputReceiver<V> receiver) {
+ receiver.output(someValue);
+}
+```
+
+## Best Practices
+
+1. **Specific OutputReceiver**: If the method only outputs to the main
output, use `OutputReceiver<T>`. If it outputs to multiple tags, use
`MultiOutputReceiver`.
+2. **Element Type**: Ensure the `@Element` parameter type matches the input
type of the `DoFn`.
+3. **Imports**: Don't forget to add imports for:
+ * `org.apache.beam.sdk.transforms.DoFn.Element`
+ * `org.apache.beam.sdk.transforms.DoFn.Timestamp`
+ * `org.apache.beam.sdk.transforms.DoFn.OutputReceiver`
+ * `org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver` (if needed)
+ * `org.apache.beam.sdk.values.PCollectionView` (if using `@SideInput`)
+ * `org.apache.beam.sdk.transforms.DoFn.SideInput`
+ *. `org.apache.beam.sdk.transforms.windowing.PaneInfo`
+4. **Side Inputs**: When using `@SideInput`, make sure to use the correct
name that matches the one passed to `ParDo.withSideInput("name", view)`.
+
+## Example Conversion
+
+### Before:
+```java
+@ProcessElement
+public void processElement(ProcessContext c) {
+ KV<String, Integer> element = c.element();
+ Instant ts = c.timestamp();
+ if (element.getValue() > threshold) {
+ c.output(element.getKey());
+ c.output(specialTag, element.getValue());
+ }
+}
+```
+
+### After:
+```java
+@ProcessElement
+public void processElement(
+ @Element KV<String, Integer> element,
+ @Timestamp Instant timestamp,
+ MultiOutputReceiver receiver) {
+ if (element.getValue() > threshold) {
+ receiver.get(mainTag).output(element.getKey());
+ receiver.get(specialTag).output(element.getValue());
+ }
+}
+```
+> [!NOTE]
+> If you only have one output, use `OutputReceiver<String> receiver` and
`receiver.output(element.getKey())`.
+
+## Side Input Conversion
+
+Modernizing side inputs involves removing the `PCollectionView` from the
`DoFn` constructor and using `@SideInput` parameter injection instead.
+
+### Before (Legacy):
+
+**PTransform/Pipeline side:**
+```java
+PCollectionView<String> myView = ...;
+input.apply(ParDo.of(new MyFn(myView)).withSideInputs(myView));
+```
+
+**DoFn side:**
+```java
+class MyFn extends DoFn<T, V> {
+ private final PCollectionView<String> view;
+ MyFn(PCollectionView<String> view) { this.view = view; }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String value = c.sideInput(view);
+ // ...
+ }
+}
+```
+
+### Nullable Side Inputs
+
+If pCollection argument is nullable (can be marked with nullable annotation
but not always) and ProcessElement has logic that depends on it, then raise it
as a blocking issue for modernization of this pattern and create two DoFns one
with side input and one without side input.
+
+**PTransform/Pipeline side:**
+```java
+PCollectionView<String> myView = ...;
+input.apply(ParDo.of(new MyFn(myView)).withSideInputs(myView));
+//or
+input.apply(ParDo.of(new MyFn(myView)); // to indroduce null
+```
+**DoFn side:**
+```
+java
Review Comment:

The code block formatting is incorrect. It should be ` ```java` on a single
line.
```suggestion
```java
```
##########
examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java:
##########
@@ -340,9 +348,9 @@ public PCollection<BadRecord> expand(PCollection<BadRecord>
input) {
static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
@ProcessElement
- public void processElement(@Element BadRecord record,
OutputReceiver<BadRecord> receiver) {
- System.out.println(record);
- receiver.output(record);
+ public void processElement(@Element BadRecord element,
OutputReceiver<BadRecord> receiver) {
Review Comment:

The parameter name was changed from `record` to `element`. While `element`
is a common convention, `record` was more descriptive in this context. Consider
keeping the more descriptive name for better readability.
```suggestion
public void processElement(@Element BadRecord record,
OutputReceiver<BadRecord> receiver) {
```
##########
examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java:
##########
@@ -93,8 +95,8 @@ public class TopWikipediaSessions {
/** Extracts user and timestamp from a TableRow representing a Wikipedia
edit. */
static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
@ProcessElement
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
+ public void processElement(@Element TableRow element,
OutputReceiver<String> receiver) {
+ TableRow row = element;
Review Comment:

The local variable `row` is redundant. You can use the `element` parameter
directly to make the code more concise.
##########
examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java:
##########
@@ -527,11 +536,13 @@ public static void runAutocompletePipeline(Options
options) throws IOException {
ParDo.of(
new DoFn<KV<String, List<CompletionCandidate>>, Long>() {
@ProcessElement
- public void process(ProcessContext c) {
- KV<String, List<CompletionCandidate>> elm =
c.element();
+ public void process(
+ @Element KV<String, List<CompletionCandidate>>
element,
+ OutputReceiver<Long> receiver) {
+ KV<String, List<CompletionCandidate>> elm = element;
Review Comment:

The local variable `elm` is redundant. You can use the `element` parameter
directly.
##########
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java:
##########
@@ -90,9 +92,9 @@ public static FailsafeElement<String, String>
wrapBigQueryInsertError(
public static class RowToTableRowFn extends DoFn<Row, TableRow> {
@ProcessElement
- public void processElement(ProcessContext context) {
- Row row = context.element();
- context.output(BigQueryUtils.toTableRow(row));
+ public void processElement(@Element Row element, OutputReceiver<TableRow>
receiver) {
+ Row row = element;
Review Comment:

The local variable `row` is redundant. You can use the `element` parameter
directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]