gemini-code-assist[bot] commented on code in PR #39084:
URL: https://github.com/apache/beam/pull/39084#discussion_r3465990135
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -395,6 +414,7 @@ public int hashCode() {
private static class UnboundedCountingReader extends UnboundedReader<Long> {
private UnboundedCountingSource source;
private long current;
+ private boolean done = false;
// Initialized on first advance()
Review Comment:

Instead of maintaining a mutable `done` state which is not checkpointed (and
would cause watermark regressions upon reader restore), we can determine if the
reader is done deterministically based on `current` and `source.end`. This
allows us to completely remove the `done` field.
```suggestion
private long current;
// Initialized on first advance()
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -453,7 +477,7 @@ private long expectedValue() {
@Override
public Instant getWatermark() {
- return source.timestampFn.apply(current);
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE :
source.timestampFn.apply(current);
Review Comment:

We can deterministically check if the reader is done by comparing `current`
with `source.end - source.stride`. If `current >= source.end - source.stride`,
then any subsequent value would be `>= source.end`, meaning no more elements
can be produced. This avoids watermark regression when restoring from a
checkpoint.
```java
return (current >= source.end - source.stride)
? BoundedWindow.TIMESTAMP_MAX_VALUE
: source.timestampFn.apply(current);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -431,6 +451,10 @@ public boolean advance() throws IOException {
return false;
}
long nextValue = current + source.stride;
+ if (nextValue >= source.end) {
+ done = true;
+ return false;
+ }
Review Comment:

Since we are removing the `done` field, we don't need to set `done = true`
here. We can just return `false` directly when the next value reaches or
exceeds the end limit.
```java
if (nextValue >= source.end) {
return false;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]