gemini-code-assist[bot] commented on code in PR #38490:
URL: https://github.com/apache/beam/pull/38490#discussion_r3236669154
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java:
##########
@@ -1063,6 +1105,19 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T
output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()),
PaneInfo.NO_FIRING);
}
+ @Override
+ public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
+ checkTimestamp(timestamp(), timestamp);
+ builderSupplier
+ .builder(output)
+ .setTimestamp(timestamp)
+ .setWindows(Collections.singleton(window()))
+ .setPaneInfo(PaneInfo.NO_FIRING)
+ .setValueKind(kind)
+ .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag,
wv))
+ .output();
+ }
+
Review Comment:

The variable `timestamp` is undefined in this method. It should be replaced
with a call to the `timestamp()` method, which provides the timestamp
associated with the window expiration. This appears to be a copy-paste error
from the `outputWithTimestamp` method where `timestamp` is a parameter.
```suggestion
@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind
kind) {
checkTimestamp(timestamp(), timestamp());
builderSupplier
.builder(output)
.setTimestamp(timestamp())
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setValueKind(kind)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag,
wv))
.output();
}
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java:
##########
@@ -1352,6 +1417,18 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T
output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()),
PaneInfo.NO_FIRING);
}
+ @Override
+ public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
+ checkTimestamp(this.timestamp, timestamp);
+ builderSupplier
+ .builder(output)
+ .setTimestamp(timestamp)
+ .setWindows(Collections.singleton(window()))
+ .setPaneInfo(PaneInfo.NO_FIRING)
+ .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag,
wv))
+ .output();
+ }
+
Review Comment:

There are two critical issues in this method: 1) The variable `timestamp` is
undefined and should be replaced with `this.timestamp`. 2) The `ValueKind`
parameter `kind` is ignored and not set on the output builder, which defeats
the purpose of the method.
```suggestion
@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind
kind) {
checkTimestamp(this.timestamp, this.timestamp);
builderSupplier
.builder(output)
.setTimestamp(this.timestamp)
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setValueKind(kind)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag,
wv))
.output();
}
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java:
##########
@@ -648,7 +660,24 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T
output, Instant timestamp
null,
null,
CausedByDrain.NORMAL,
- element.getOpenTelemetryContext()));
+ element.getOpenTelemetryContext(),
+ ValueKind.INSERT));
+ }
+
+ @Override
+ public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
+ getMutableOutput(tag)
+ .add(
+ ValueInSingleWindow.of(
+ output,
+ element.getTimestamp(),
+ element.getWindow(),
+ element.getPaneInfo(),
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ null,
+ kind));
}
Review Comment:

The `outputWithKind` implementation in `DoFnTester` should propagate the
OpenTelemetry context from the current element, consistent with how
`outputWithTimestamp` is implemented in this class.
```suggestion
@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind
kind) {
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
output,
element.getTimestamp(),
element.getWindow(),
element.getPaneInfo(),
null,
null,
CausedByDrain.NORMAL,
element.getOpenTelemetryContext(),
kind));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]