gemini-code-assist[bot] commented on code in PR #38920:
URL: https://github.com/apache/beam/pull/38920#discussion_r3396344073


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -553,4 +553,71 @@ public void testPollWorkWithLinkedBlockingQueue() throws 
Exception {
     blockerStop.countDown();
     testExecutor.shutdown();
   }
+
+  @Test
+  public void testPollWorkDropsFailedWork() throws Exception {
+    BoundedQueueExecutor testExecutor =
+        new BoundedQueueExecutor(
+            /* initialMaximumPoolSize= */ 1,
+            /* keepAliveTime= */ 60,
+            /* unit= */ TimeUnit.SECONDS,
+            /* maximumElementsOutstanding= */ 100,
+            /* maximumBytesOutstanding= */ 10000000,
+            new 
ThreadFactoryBuilder().setNameFormat("testStealing-%d").setDaemon(true).build(),
+            useFairMonitor,
+            /*useKeyGroupWorkQueue=*/ true);
+
+    // Create blocker task to occupy the worker thread
+    CountDownLatch blockerStart = new CountDownLatch(1);
+    CountDownLatch blockerStop = new CountDownLatch(1);
+    ExecutableWork blockerWork =
+        createWorkWithCompIdAndKeyGroup(
+            "blockerComp",
+            DEFAULT_KEY_GROUP,
+            ignored -> {
+              blockerStart.countDown();
+              try {
+                blockerStop.await();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            });
+
+    testExecutor.execute(blockerWork, 0);
+    blockerStart.await();
+
+    Work.KeyGroup keyGroup1 = Work.KeyGroup.create(1, 1);
+
+    // Create executable tasks
+    ExecutableWork work1 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, 
ignored -> {});
+    ExecutableWork work2 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, 
ignored -> {});
+
+    // Mark work1 as failed
+    work1.work().setFailed();
+
+    // Enqueue tasks
+    testExecutor.execute(work1, 100);
+    testExecutor.execute(work2, 150);
+
+    // Total outstanding elements must be 3 (blocker + work1 + work2)
+    assertEquals(3, testExecutor.elementsOutstanding());
+
+    // Steal work from keyGroup1.
+    // The first work in queue is work1, which is failed.
+    // It should be dropped, its handle closed, and work2 should be returned.
+    try (BoundedQueueExecutorWorkHandleImpl stealHandle = 
testExecutor.createBudgetHandle(0, 0L)) {
+      ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, 
stealHandle);
+      assertNotNull(stolen);
+      assertEquals(work2, stolen);
+      // blocker (1) + work2 (1) = 2. work1 (1) should have been released.
+      assertEquals(2, testExecutor.elementsOutstanding());
+    }
+    // work2 should also be released now because stealHandle is closed.
+    // blocker (1) = 1.
+    assertEquals(1, testExecutor.elementsOutstanding());
+
+    // Unblock the blocker and shut down
+    blockerStop.countDown();
+    testExecutor.shutdown();

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If any assertion fails during the test execution (for example, 
`assertEquals(3, testExecutor.elementsOutstanding())`), the 
`blockerStop.countDown()` and `testExecutor.shutdown()` calls will be skipped. 
This will cause the blocker thread to hang indefinitely on 
`blockerStop.await()`, leading to thread leaks and potential test suite hangs.
   
   We should wrap the execution and assertions in a `try-finally` block to 
ensure that resources are always cleaned up and the blocker thread is unblocked.
   
   ```java
       try {
         // Enqueue tasks
         testExecutor.execute(work1, 100);
         testExecutor.execute(work2, 150);
   
         // Total outstanding elements must be 3 (blocker + work1 + work2)
         assertEquals(3, testExecutor.elementsOutstanding());
   
         // Steal work from keyGroup1.
         // The first work in queue is work1, which is failed.
         // It should be dropped, its handle closed, and work2 should be 
returned.
         try (BoundedQueueExecutorWorkHandleImpl stealHandle = 
testExecutor.createBudgetHandle(0, 0L)) {
           ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, 
stealHandle);
           assertNotNull(stolen);
           assertEquals(work2, stolen);
           // blocker (1) + work2 (1) = 2. work1 (1) should have been released.
           assertEquals(2, testExecutor.elementsOutstanding());
         }
         // work2 should also be released now because stealHandle is closed.
         // blocker (1) = 1.
         assertEquals(1, testExecutor.elementsOutstanding());
       } finally {
         // Unblock the blocker and shut down
         blockerStop.countDown();
         testExecutor.shutdown();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to