gemini-code-assist[bot] commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3395347334


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements 
Callable<Void> {
 
     private final DataflowPipelineJob job;
     private final ErrorMonitorMessagesHandler messageHandler;
-
-    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    private final TestDataflowRunner runner;
+    private final 
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+        assertionsPassedRef;
+
+    public CancelOnError(
+        DataflowPipelineJob job,
+        ErrorMonitorMessagesHandler messageHandler,
+        TestDataflowRunner runner,
+        java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.runner = runner;
+      this.assertionsPassedRef = assertionsPassedRef;
     }
 
     @Override
     public Void call() throws Exception {
+      int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 
* 3s)
+      int steps = 0;
+      boolean cancellationPending = false;
       while (true) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To support measuring the actual elapsed time of the job monitor loop (which 
is more robust against slow API calls or network latency), we should record the 
start time of the loop using `System.currentTimeMillis()`.
   
   ```java
       @Override
       public Void call() throws Exception {
         int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds 
(5 * 3s)
         int steps = 0;
         boolean cancellationPending = false;
         long startTimeMillis = System.currentTimeMillis();
         while (true) {
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements 
Callable<Void> {
 
     private final DataflowPipelineJob job;
     private final ErrorMonitorMessagesHandler messageHandler;
-
-    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    private final TestDataflowRunner runner;
+    private final 
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+        assertionsPassedRef;
+
+    public CancelOnError(
+        DataflowPipelineJob job,
+        ErrorMonitorMessagesHandler messageHandler,
+        TestDataflowRunner runner,
+        java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.runner = runner;
+      this.assertionsPassedRef = assertionsPassedRef;
     }
 
     @Override
     public Void call() throws Exception {
+      int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 
* 3s)
+      int steps = 0;
+      boolean cancellationPending = false;
       while (true) {
-        State jobState = job.getState();
-
-        // If we see an error, cancel and note failure
-        if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
-          job.cancel();
-          LOG.info("Cancelling Dataflow job {}", job.getJobId());
-          return null;
-        }
-
-        if (jobState.isTerminal()) {
-          return null;
+        try {
+          State jobState = job.getState();
+
+          if (jobState.isTerminal()) {
+            return null;
+          }
+
+          // Check if we should initiate cancellation based on metrics (only 
if assertion state is
+          // not yet known)
+          if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+            if (runner.expectedNumberOfAssertions > 0 && steps % 
checkMetricsIntervalSteps == 0) {
+              try {
+                Optional<Boolean> assertionsPassed = 
runner.checkForPAssertSuccess(job);
+                if (assertionsPassed.isPresent()) {
+                  assertionsPassedRef.set(assertionsPassed);
+                  cancellationPending = true;
+                  if (assertionsPassed.get()) {
+                    LOG.info(
+                        "All assertions passed for streaming job {}, 
cancelling job.",
+                        job.getJobId());
+                  } else {
+                    LOG.info(
+                        "Found failed assertion for streaming job {}, 
cancelling job.",
+                        job.getJobId());
+                  }
+                }
+              } catch (Exception e) {
+                LOG.warn("Transient error polling metrics for job {}", 
job.getJobId(), e);
+              }
+            }
+          }
+
+          // Check if we should initiate cancellation based on error logs 
(only if not already
+          // cancellationPending)
+          if (!cancellationPending) {
+            long runningTimeMillis = steps * 3000L;
+            if (messageHandler.hasSeenError()
+                && (runningTimeMillis > 300000L || 
runner.expectedNumberOfAssertions == 0)) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Calculating `runningTimeMillis` as `steps * 3000L` assumes that each 
iteration of the loop takes exactly 3 seconds. However, network calls to the 
Dataflow API (such as `job.getState()` or `runner.checkForPAssertSuccess(job)`) 
can be slow or timeout, making the actual elapsed time much longer than `steps 
* 3000L`. This can cause the 5-minute delay to be significantly longer in 
practice. We should use the actual elapsed time since the monitor started.
   
   ```suggestion
             if (!cancellationPending) {
               long runningTimeMillis = System.currentTimeMillis() - 
startTimeMillis;
               if (messageHandler.hasSeenError()
                   && (runningTimeMillis > 300000L || 
runner.expectedNumberOfAssertions == 0)) {
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements 
Callable<Void> {
 
     private final DataflowPipelineJob job;
     private final ErrorMonitorMessagesHandler messageHandler;
-
-    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    private final TestDataflowRunner runner;
+    private final 
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+        assertionsPassedRef;
+
+    public CancelOnError(
+        DataflowPipelineJob job,
+        ErrorMonitorMessagesHandler messageHandler,
+        TestDataflowRunner runner,
+        java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.runner = runner;
+      this.assertionsPassedRef = assertionsPassedRef;
     }
 
     @Override
     public Void call() throws Exception {
+      int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 
* 3s)
+      int steps = 0;
+      boolean cancellationPending = false;
       while (true) {
-        State jobState = job.getState();
-
-        // If we see an error, cancel and note failure
-        if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
-          job.cancel();
-          LOG.info("Cancelling Dataflow job {}", job.getJobId());
-          return null;
-        }
-
-        if (jobState.isTerminal()) {
-          return null;
+        try {
+          State jobState = job.getState();
+
+          if (jobState.isTerminal()) {
+            return null;
+          }
+
+          // Check if we should initiate cancellation based on metrics (only 
if assertion state is
+          // not yet known)
+          if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+            if (runner.expectedNumberOfAssertions > 0 && steps % 
checkMetricsIntervalSteps == 0) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   At `steps = 0`, `steps % checkMetricsIntervalSteps == 0` is true, so the 
runner immediately makes a network call to check metrics. Since the job has 
just been submitted, it is guaranteed to have no metrics yet. We should skip 
checking metrics on the very first step (`steps > 0`) to avoid a redundant API 
call.
   
   ```suggestion
             if (!assertionsPassedRef.get().isPresent() && 
!cancellationPending) {
               if (runner.expectedNumberOfAssertions > 0 && steps > 0 && steps 
% checkMetricsIntervalSteps == 0) {
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements 
Callable<Void> {
 
     private final DataflowPipelineJob job;
     private final ErrorMonitorMessagesHandler messageHandler;
-
-    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    private final TestDataflowRunner runner;
+    private final 
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+        assertionsPassedRef;
+
+    public CancelOnError(
+        DataflowPipelineJob job,
+        ErrorMonitorMessagesHandler messageHandler,
+        TestDataflowRunner runner,
+        java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.runner = runner;
+      this.assertionsPassedRef = assertionsPassedRef;
     }
 
     @Override
     public Void call() throws Exception {
+      int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 
* 3s)
+      int steps = 0;
+      boolean cancellationPending = false;
       while (true) {
-        State jobState = job.getState();
-
-        // If we see an error, cancel and note failure
-        if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
-          job.cancel();
-          LOG.info("Cancelling Dataflow job {}", job.getJobId());
-          return null;
-        }
-
-        if (jobState.isTerminal()) {
-          return null;
+        try {
+          State jobState = job.getState();
+
+          if (jobState.isTerminal()) {
+            return null;
+          }
+
+          // Check if we should initiate cancellation based on metrics (only 
if assertion state is
+          // not yet known)
+          if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+            if (runner.expectedNumberOfAssertions > 0 && steps % 
checkMetricsIntervalSteps == 0) {
+              try {
+                Optional<Boolean> assertionsPassed = 
runner.checkForPAssertSuccess(job);
+                if (assertionsPassed.isPresent()) {
+                  assertionsPassedRef.set(assertionsPassed);
+                  cancellationPending = true;
+                  if (assertionsPassed.get()) {
+                    LOG.info(
+                        "All assertions passed for streaming job {}, 
cancelling job.",
+                        job.getJobId());
+                  } else {
+                    LOG.info(
+                        "Found failed assertion for streaming job {}, 
cancelling job.",
+                        job.getJobId());
+                  }
+                }
+              } catch (Exception e) {
+                LOG.warn("Transient error polling metrics for job {}", 
job.getJobId(), e);
+              }
+            }
+          }
+
+          // Check if we should initiate cancellation based on error logs 
(only if not already
+          // cancellationPending)
+          if (!cancellationPending) {
+            long runningTimeMillis = steps * 3000L;
+            if (messageHandler.hasSeenError()
+                && (runningTimeMillis > 300000L || 
runner.expectedNumberOfAssertions == 0)) {
+              LOG.info(
+                  "Cancelling Dataflow job due to error messages seen: {}",
+                  messageHandler.getErrorMessage());
+              cancellationPending = true;
+            }
+          }
+
+          // Perform or retry cancellation if cancellation is pending
+          if (cancellationPending) {
+            try {
+              job.cancel();
+              return null; // Successful cancellation
+            } catch (Exception e) {
+              LOG.warn(
+                  "Failed to cancel Dataflow job {}. Will retry on next 
iteration. Error",
+                  job.getJobId(),
+                  e);
+            }
+          }
+
+        } catch (Exception e) {
+          LOG.warn("Exception in streaming job monitor loop for job {}", 
job.getJobId(), e);
         }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Catching `Exception` inside the loop can swallow `InterruptedException` if 
it is thrown inside the `try` block. If `InterruptedException` is caught and 
swallowed without restoring the interrupted status or exiting, the thread might 
not terminate properly when cancelled. We should check if the caught exception 
is an instance of `InterruptedException`, restore the interrupted status, and 
exit the loop.
   
   ```java
           } catch (Exception e) {
             if (e instanceof InterruptedException) {
               Thread.currentThread().interrupt();
               return null;
             }
             LOG.warn("Exception in streaming job monitor loop for job {}", 
job.getJobId(), e);
           }
   ```



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +598,89 @@ public void 
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
+  @Test
+  public void testRunStreamingJobEarlySuccess() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    java.util.concurrent.CountDownLatch cancelLatch = new 
java.util.concurrent.CountDownLatch(1);
+    try {
+      Mockito.doAnswer(
+              invocation -> {
+                cancelLatch.countDown();
+                return null;
+              })
+          .when(mockJob)
+          .cancel();
+    } catch (Exception e) {
+      // Ignore
+    }
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenAnswer(
+            invocation -> {
+              cancelLatch.await();
+              return State.CANCELLED;
+            });

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `cancelLatch.await()` without a timeout can cause the test to hang 
indefinitely if the job is never cancelled (e.g., due to a bug in the code 
under test). It is a best practice to use a timeout on `await()` to ensure the 
test fails fast instead of hanging the CI build.
   
   ```suggestion
       when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
           .thenAnswer(
               invocation -> {
                 if (!cancelLatch.await(10, 
java.util.concurrent.TimeUnit.SECONDS)) {
                   throw new RuntimeException("Timeout waiting for job 
cancellation");
                 }
                 return State.CANCELLED;
               });
   ```



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java:
##########
@@ -995,21 +998,51 @@ public ProcessContinuation process(
         throws InterruptedException {
       AtomicBoolean wasFinalized =
           WAS_FINALIZED.computeIfAbsent(element, (unused) -> new 
AtomicBoolean());
+
+      long currentAttempt = tracker.currentRestriction().getFrom();
+
+      // On subsequent attempts, the previous bundle has committed, so the 
finalization
+      // callback should run. Poll wasFinalized with a timed wait to avoid 
deadlocks
+      // on single-threaded executors.
+      if (currentAttempt > 0 && !wasFinalized.get()) {
+        long limitMs = 1000;
+        long start = System.currentTimeMillis();
+        while (!wasFinalized.get() && (System.currentTimeMillis() - start) < 
limitMs) {
+          sleep(10L);
+        }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `System.currentTimeMillis()` to measure elapsed time is not monotonic 
and can be affected by system clock adjustments (e.g., NTP sync). It is safer 
and more robust to use `System.nanoTime()` for measuring elapsed time in loops.
   
   ```suggestion
         if (currentAttempt > 0 && !wasFinalized.get()) {
           long limitNs = 1_000_000_000L;
           long start = System.nanoTime();
           while (!wasFinalized.get() && (System.nanoTime() - start) < limitNs) 
{
             sleep(10L);
           }
   ```



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +598,89 @@ public void 
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
+  @Test
+  public void testRunStreamingJobEarlySuccess() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    java.util.concurrent.CountDownLatch cancelLatch = new 
java.util.concurrent.CountDownLatch(1);
+    try {
+      Mockito.doAnswer(
+              invocation -> {
+                cancelLatch.countDown();
+                return null;
+              })
+          .when(mockJob)
+          .cancel();
+    } catch (Exception e) {
+      // Ignore
+    }
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenAnswer(
+            invocation -> {
+              cancelLatch.await();
+              return State.CANCELLED;
+            });
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+
+    Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();
+  }
+
+  @Test
+  public void testRunStreamingJobEarlyFailure() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    java.util.concurrent.CountDownLatch cancelLatch = new 
java.util.concurrent.CountDownLatch(1);
+    try {
+      Mockito.doAnswer(
+              invocation -> {
+                cancelLatch.countDown();
+                return null;
+              })
+          .when(mockJob)
+          .cancel();
+    } catch (Exception e) {
+      // Ignore
+    }
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenAnswer(
+            invocation -> {
+              cancelLatch.await();
+              return State.CANCELLED;
+            });

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `cancelLatch.await()` without a timeout can cause the test to hang 
indefinitely if the job is never cancelled. It is a best practice to use a 
timeout on `await()` to ensure the test fails fast instead of hanging the CI 
build.
   
   ```suggestion
       when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
           .thenAnswer(
               invocation -> {
                 if (!cancelLatch.await(10, 
java.util.concurrent.TimeUnit.SECONDS)) {
                   throw new RuntimeException("Timeout waiting for job 
cancellation");
                 }
                 return State.CANCELLED;
               });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to