gemini-code-assist[bot] commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3329102223


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +609,54 @@ public void 
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
+  @Test
+  public void testRunStreamingJobEarlySuccess() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.CANCELLED);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.CANCELLED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   In `testRunStreamingJobEarlySuccess`, `mockJob.waitUntilFinish` is mocked to 
return `State.CANCELLED` immediately. Because of this, the main thread does not 
block and may finish executing before the background `CancelOnError` thread 
even runs or calls `cancel()`. The test currently passes because the main 
thread falls back to calling `checkForPAssertSuccess(job)` synchronously.\n\nTo 
ensure that the asynchronous early-termination logic is actually being executed 
and that `job.cancel()` is called by the background thread, we should use 
Mockito's `timeout` verification.
   
   ```java
     @Test\n  public void testRunStreamingJobEarlySuccess() throws Exception 
{\n    options.setStreaming(true);\n    Pipeline p = 
TestPipeline.create(options);\n    PCollection<Integer> pc = 
p.apply(Create.of(1, 2, 3));\n    PAssert.that(pc).containsInAnyOrder(1, 2, 
3);\n\n    DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);\n    
when(mockJob.getState()).thenReturn(State.CANCELLED);\n    
when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))\n        .thenReturn(State.CANCELLED);\n    
when(mockJob.getProjectId()).thenReturn("test-project");\n    
when(mockJob.getJobId()).thenReturn("test-job");\n\n    DataflowRunner 
mockRunner = Mockito.mock(DataflowRunner.class);\n    
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);\n\n    
when(mockClient.getJobMetrics(anyString()))\n        
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative 
*/));\n    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAnd
 Client(options, mockClient);\n    runner.run(p, mockRunner);\n\n    
Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();\n  }
   ```



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +609,54 @@ public void 
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
+  @Test
+  public void testRunStreamingJobEarlySuccess() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.CANCELLED);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.CANCELLED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobEarlyFailure() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.CANCELLED);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.CANCELLED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+    expectedException.expect(AssertionError.class);
+    runner.run(p, mockRunner);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Similarly to `testRunStreamingJobEarlySuccess`, 
`testRunStreamingJobEarlyFailure` should verify that `job.cancel()` is 
asynchronously called when an assertion fails. Since we expect an 
`AssertionError` to be thrown, we can use a try-catch block to capture the 
exception and then perform the verification.
   
   ```java
     @Test\n  public void testRunStreamingJobEarlyFailure() throws Exception 
{\n    options.setStreaming(true);\n    Pipeline p = 
TestPipeline.create(options);\n    PCollection<Integer> pc = 
p.apply(Create.of(1, 2, 3));\n    PAssert.that(pc).containsInAnyOrder(1, 2, 
3);\n\n    DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);\n    
when(mockJob.getState()).thenReturn(State.CANCELLED);\n    
when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))\n        .thenReturn(State.CANCELLED);\n    
when(mockJob.getProjectId()).thenReturn("test-project");\n    
when(mockJob.getJobId()).thenReturn("test-job");\n\n    DataflowRunner 
mockRunner = Mockito.mock(DataflowRunner.class);\n    
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);\n\n    
when(mockClient.getJobMetrics(anyString()))\n        
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative 
*/));\n    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAn
 dClient(options, mockClient);\n\n    try {\n      runner.run(p, mockRunner);\n 
     org.junit.Assert.fail("Expected AssertionError to be thrown");\n    } 
catch (AssertionError expected) {\n      // Expected\n    }\n\n    
Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();\n  }
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +384,66 @@ private static class CancelOnError implements 
Callable<Void> {
 
     private final DataflowPipelineJob job;
     private final ErrorMonitorMessagesHandler messageHandler;
-
-    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    private final TestDataflowRunner runner;
+    private final 
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef;
+
+    public CancelOnError(
+        DataflowPipelineJob job,
+        ErrorMonitorMessagesHandler messageHandler,
+        TestDataflowRunner runner,
+        java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.runner = runner;
+      this.assertionsPassedRef = assertionsPassedRef;
     }
 
     @Override
     public Void call() throws Exception {
+      int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 
* 3s)
+      int steps = 0;
       while (true) {
         State jobState = job.getState();
 
-        // If we see an error, cancel and note failure
-        if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
-          job.cancel();
-          LOG.info("Cancelling Dataflow job {}", job.getJobId());
+        if (jobState.isTerminal()) {
           return null;
         }
 
-        if (jobState.isTerminal()) {
+        // Check metrics for early success/failure cancellation
+        if (steps % checkMetricsIntervalSteps == 0) {
+          Optional<Boolean> assertionsPassed = 
runner.checkForPAssertSuccess(job);
+          if (assertionsPassed.isPresent()) {
+            assertionsPassedRef.set(assertionsPassed);
+            if (assertionsPassed.get()) {
+              LOG.info(
+                  "All assertions passed for streaming job {}, cancelling 
job.",
+                  job.getJobId());
+              job.cancel();
+              return null;
+            } else {
+              LOG.info(
+                  "Found failed assertion for streaming job {}, cancelling 
job.",
+                  job.getJobId());
+              job.cancel();
+              return null;
+            }
+          }
+        }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The metrics polling and job cancellation logic in `CancelOnError` is prone 
to transient network or API failures (e.g., rate limits or temporary service 
outages when calling `checkForPAssertSuccess` or `cancel`). If any exception is 
thrown during these calls, the background thread will terminate silently, 
preventing any further monitoring or early termination for the streaming 
job.\n\nTo make this robust, we should:\n1. Wrap the API calls in a try-catch 
block to prevent transient errors from killing the monitoring thread.\n2. Avoid 
redundant metrics queries once we have already determined that assertions have 
passed/failed (i.e., if `assertionsPassedRef` is already set, we should just 
retry the cancellation directly).
   
   ```suggestion
           // Check metrics for early success/failure cancellation\n        if 
(assertionsPassedRef.get().isPresent()) {\n          try {\n            
LOG.info("Retrying cancellation for job {}...", job.getJobId());\n            
job.cancel();\n            return null;\n          } catch (Exception e) {\n    
        LOG.warn("Failed to cancel job {}, will retry.", job.getJobId(), e);\n  
        }\n        } else if (steps % checkMetricsIntervalSteps == 0) {\n       
   try {\n            Optional<Boolean> assertionsPassed = 
runner.checkForPAssertSuccess(job);\n            if 
(assertionsPassed.isPresent()) {\n              
assertionsPassedRef.set(assertionsPassed);\n              LOG.info(\n           
       "{} assertion(s) for streaming job {}, cancelling job.",\n               
   assertionsPassed.get() ? "All passed" : "Found failed",\n                  
job.getJobId());\n              try {\n                job.cancel();\n          
      return null;\n              } catch (Exception
  e) {\n                LOG.warn("Failed to cancel job {}, will retry.", 
job.getJobId(), e);\n              }\n            }\n          } catch 
(Exception e) {\n            LOG.warn("Failed to check assertions for job {}.", 
job.getJobId(), e);\n          }\n        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to