Copilot commented on code in PR #18467:
URL: https://github.com/apache/druid/pull/18467#discussion_r2315693056


##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -264,23 +296,37 @@ public void launchWorkersIfNeeded(final int workerCount)
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
-        // add failed tasks to retry the queue
-        if (workerFailureListenerRef.get() != null) {
-          for (TaskTracker taskTracker : taskTrackers.values()) {
-            if (taskTracker.isRetrying()) {
-              invokeFailureListener(
-                  taskTracker,
-                  new WorkerFailedFault(
-                      taskTracker.msqWorkerTask.getId(),
-                      taskTracker.statusRef.get().getErrorMsg()
-                  )
-              );
-            }
+        
+        // Check for failed workers and collect them
+        for (TaskTracker taskTracker : taskTrackers.values()) {
+          if (taskTracker.isRetrying()) {
+            failedWorkers.add(
+                new IntObjectPair<>()
+                {
+                  @Override
+                  public int leftInt()
+                  {
+                    return taskTracker.getWorkerNumber();
+                  }
+
+                  @Override
+                  public MSQFault right()
+                  {
+                    return 
generateFailureFault(taskTracker.msqWorkerTask.getId(), 
taskTracker.statusRef.get());
+                  }
+                }
+            );
           }
         }
-        taskIds.wait();
+        if (!failedWorkers.isEmpty()) {
+          return failedWorkers;
+        }
+        taskIds.wait(taskIdsLockTimeout);

Review Comment:
   The timeout parameter is hardcoded to 60 seconds in the constructor. 
Consider making this configurable or documenting why this specific timeout was 
chosen to prevent potential deadlock scenarios.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -264,23 +296,37 @@ public void launchWorkersIfNeeded(final int workerCount)
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
-        // add failed tasks to retry the queue
-        if (workerFailureListenerRef.get() != null) {
-          for (TaskTracker taskTracker : taskTrackers.values()) {
-            if (taskTracker.isRetrying()) {
-              invokeFailureListener(
-                  taskTracker,
-                  new WorkerFailedFault(
-                      taskTracker.msqWorkerTask.getId(),
-                      taskTracker.statusRef.get().getErrorMsg()
-                  )
-              );
-            }
+        
+        // Check for failed workers and collect them
+        for (TaskTracker taskTracker : taskTrackers.values()) {
+          if (taskTracker.isRetrying()) {
+            failedWorkers.add(
+                new IntObjectPair<>()
+                {
+                  @Override
+                  public int leftInt()
+                  {
+                    return taskTracker.getWorkerNumber();
+                  }
+
+                  @Override
+                  public MSQFault right()
+                  {
+                    return 
generateFailureFault(taskTracker.msqWorkerTask.getId(), 
taskTracker.statusRef.get());
+                  }
+                }

Review Comment:
   Creating anonymous classes inline makes the code harder to read. Consider 
using a factory method or constructor to create IntObjectPair instances, such 
as `IntObjectPair.of(taskTracker.getWorkerNumber(), generateFailureFault(...))`.
   ```suggestion
                   IntObjectPair.of(
                       taskTracker.getWorkerNumber(),
                       generateFailureFault(taskTracker.msqWorkerTask.getId(), 
taskTracker.statusRef.get())
                   )
   ```



##########
multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java:
##########
@@ -57,215 +59,179 @@
 import javax.annotation.Nullable;
 import java.net.URI;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class MSQWorkerTaskLauncherRetryTests
+public class MSQWorkerTaskLauncherRetryTest
 {
-
   private static final TaskLocation RUNNING_TASK_LOCATION = new 
TaskLocation("host", 1, 2, null);
 
   @Test
-  public void mainThreadBlockingSimulationTest() throws Exception
+  public void testLaunchWorkersIfNeeded_returnsFailedWorkers() throws 
InterruptedException
   {
-    final ExecutorService executors = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setDaemon(false)
-                                                                               
                   .setNameFormat(
-                                                                               
                       "Controller-simulator-%d")
-                                                                               
                   .build());
-
-    final TestOverlordClient overlordClient = new TestOverlordClient();
-    final int failedWorkerNumber = 2;
-    final CountDownLatch workerFailedLatch = new CountDownLatch(1);
-    final CountDownLatch workerStartedLatch = new CountDownLatch(1);
-    overlordClient.addFailedWorker(2);
-    overlordClient.addUnknownLocationWorker(1);
-
-    final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new 
MSQWorkerTaskLauncher(
+    TestOverlordClient overlordClient = new TestOverlordClient();
+    overlordClient.addFailedWorker(1);
+
+    AtomicInteger failureCallbackCount = new AtomicInteger(0);
+    WorkerFailureListener workerFailureListener = (task, fault) -> {
+      failureCallbackCount.incrementAndGet();
+      Assertions.assertEquals(1, task.getWorkerNumber());
+      Assertions.assertTrue(fault instanceof WorkerFailedFault);
+    };
+
+    MSQWorkerTaskLauncher launcher = new MSQWorkerTaskLauncher(
         "controller-id",
         "foo",
         overlordClient,
+        workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
-        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
+        2
     );
 
-    try {
-      final long workerThreadId = Thread.currentThread().getId();
+    launcher.start();
 
-      startTaskLauncher(
-          msqWorkerTaskLauncher,
-          failedWorkerNumber,
-          workerFailedLatch,
-          overlordClient,
-          workerThreadId,
-          workerStartedLatch
-      );
+    // Should return failed workers in the set
+    Set<IntObjectPair<MSQFault>> failedWorkers = 
launcher.launchWorkersIfNeeded(2);
 
-      MockConsumer mockConsumer = new MockConsumer(
-          msqWorkerTaskLauncher,
-          3,
-          workerStartedLatch
-      );
-      Future<?> futures = executors.submit(mockConsumer);
-      // hook called but worker not queued for relaunch.
-      workerFailedLatch.await();
-      Assertions.assertEquals(1, workerStartedLatch.getCount());
-      // we would need to call hooks to allow the main thread to proceed since 
we are using an exec service to so the thread id's would not match.
-      enableWorkerRelaunch(overlordClient, failedWorkerNumber, 
msqWorkerTaskLauncher, workerStartedLatch);
-      // future should be completed in 5 seconds else throw an exception.
-      Assertions.assertNull(futures.get(5, TimeUnit.SECONDS));
-    }
-    finally {
-      msqWorkerTaskLauncher.stop(true);
-      executors.shutdownNow();
+    // The method should not invoke the failure listener directly anymore, 
+    // but should return the failed workers
+    Assertions.assertFalse(failedWorkers.isEmpty());
+
+    // Check that the failed worker is in the returned set
+    boolean foundFailedWorker = false;
+    for (IntObjectPair<MSQFault> failedWorker : failedWorkers) {
+      if (failedWorker.leftInt() == 1) {
+        foundFailedWorker = true;
+        Assertions.assertTrue(failedWorker.right() instanceof 
WorkerFailedFault);
+      }
     }
-  }
+    Assertions.assertTrue(foundFailedWorker, "Failed worker should be in the 
returned set");
 
-  private static void enableWorkerRelaunch(
-      TestOverlordClient overlordClient,
-      int failedWorkerNumber,
-      MSQWorkerTaskLauncher msqWorkerTaskLauncher,
-      CountDownLatch workerStartedLatch
-  )
-  {
-    overlordClient.removeUnknownLocationWorker(1);
-    overlordClient.removefailedWorker(failedWorkerNumber);
-    msqWorkerTaskLauncher.submitForRelaunch(failedWorkerNumber);
-    workerStartedLatch.countDown();
+    launcher.stop(true);
   }
 
-  private static void startTaskLauncher(
-      MSQWorkerTaskLauncher msqWorkerTaskLauncher,
-      int failedWorkerNumber,
-      CountDownLatch workerFailedLatch,
-      TestOverlordClient overlordClient,
-      long workerThreadId,
-      CountDownLatch workerStartedLatch
-  )
+  @Test
+  public void testWaitForWorkers_returnsFailedWorkers() throws 
InterruptedException
   {
-    msqWorkerTaskLauncher.start((task, fault) -> {
-      Assertions.assertEquals(failedWorkerNumber, task.getWorkerNumber());
-      workerFailedLatch.countDown();
-      if (workerThreadId == Thread.currentThread().getId()) {
-        // If the worker thread is the same as the main thread, we can 
directly relaunch the worker.
-        enableWorkerRelaunch(overlordClient, failedWorkerNumber, 
msqWorkerTaskLauncher, workerStartedLatch);
-      }
-    });
-  }
+    TestOverlordClient overlordClient = new TestOverlordClient();
+    overlordClient.addFailedWorker(0);
 
+    AtomicInteger failureCallbackCount = new AtomicInteger(0);
+    WorkerFailureListener workerFailureListener = (task, fault) -> {
+      failureCallbackCount.incrementAndGet();
+    };
 
-  @Test
-  public void mainThreadNonBlockingSimulationTest() throws Exception
-  {
-    final TestOverlordClient overlordClient = new TestOverlordClient();
-    final int failedWorkerNumber = 2;
-    final CountDownLatch workerFailedLatch = new CountDownLatch(1);
-    final CountDownLatch workerStartedLatch = new CountDownLatch(1);
-    overlordClient.addFailedWorker(2);
-    overlordClient.addUnknownLocationWorker(1);
-
-    final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new 
MSQWorkerTaskLauncher(
+    MSQWorkerTaskLauncher launcher = new MSQWorkerTaskLauncher(
         "controller-id",
         "foo",
         overlordClient,
+        workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
-        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+
+        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),

Review Comment:
   There's an extra blank line before the constructor parameter that breaks the 
parameter alignment pattern used elsewhere in the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to