gianm commented on code in PR #18254:
URL: https://github.com/apache/druid/pull/18254#discussion_r2216970487


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -800,7 +813,7 @@ private void addToRetryQueue(ControllerQueryKernel kernel, 
int worker, MSQFault
 
     List<WorkOrder> retriableWorkOrders = 
kernel.getWorkInCaseWorkerEligibleForRetryElseThrow(worker, fault);
     if (!retriableWorkOrders.isEmpty()) {
-      log.info("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);
+      log.debug("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);

Review Comment:
   Is this intentional in order to reduce log spam?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -301,14 +318,31 @@ public void reportFailedInactiveWorker(int workerNumber)
   }
 
   @Override
-  public void waitForWorkers(Set<Integer> workerNumbers) throws 
InterruptedException
+  public void waitForWorkers(Set<Integer> workerNumbers)
+      throws InterruptedException
   {
     synchronized (taskIds) {
       while (!fullyStartedTasks.containsAll(workerNumbers)) {
         if (stopFuture.isDone() || stopFuture.isCancelled()) {
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
+
+        WorkerFailureListener workerFailureListener = 
workerFailureListenerRef.get();
+        if (workerFailureListener != null) {
+          for (TaskTracker taskTracker : taskTrackers.values()) {
+            if (taskTracker.isRetrying() && 
workerNumbers.contains(taskTracker.workerNumber)) {
+              workerFailureListener.onFailure(

Review Comment:
   Can this use `invokeFailureListener`?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2547,7 +2564,9 @@ private void startStages() throws IOException, 
InterruptedException
               stageDef.doesShuffle() ? stageDef.getShuffleSpec().kind() : 
"none"
           );
 
-          workerManager.launchWorkersIfNeeded(workerCount);
+          workerManager.launchWorkersIfNeeded(

Review Comment:
   no reason to reformat



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1379,13 +1392,15 @@ private void contactWorkersForStage(
       final boolean retryOnFailure
   )
   {
-    // Sorted copy of target worker numbers to ensure consistent iteration 
order.
+    // Sorted copy of target worker numbers to ensure a consistent iteration 
order.
     final List<Integer> workersCopy = Ordering.natural().sortedCopy(workers);
     final List<String> workerIds = getWorkerIds();
     final List<ListenableFuture<Void>> workerFutures = new 
ArrayList<>(workersCopy.size());
 
     try {
-      workerManager.waitForWorkers(workers);
+      workerManager.waitForWorkers(

Review Comment:
   no reason to reformat



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -244,22 +246,37 @@ public List<String> getWorkerIds()
   }
 
   @Override
-  public void launchWorkersIfNeeded(final int taskCount) throws 
InterruptedException
+  public void launchWorkersIfNeeded(final int workerCount)
+      throws InterruptedException
   {
     synchronized (taskIds) {
-      retryInactiveTasksIfNeeded(taskCount);
+      retryInactiveTasksIfNeeded(workerCount);
 
-      if (taskCount > desiredTaskCount) {
-        desiredTaskCount = taskCount;
+      if (workerCount > desiredTaskCount) {
+        desiredTaskCount = workerCount;
         taskIds.notifyAll();
       }
 
-      while (taskIds.size() < taskCount || !allTasksStarted(taskCount)) {
+      while (taskIds.size() < workerCount || !allTasksStarted(workerCount)) {
         if (stopFuture.isDone() || stopFuture.isCancelled()) {
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
-
+        // add failed tasks to retry the queue
+        WorkerFailureListener workerFailureListener = 
workerFailureListenerRef.get();
+        if (workerFailureListener != null) {
+          for (TaskTracker taskTracker : taskTrackers.values()) {
+            if (taskTracker.isRetrying()) {
+              workerFailureListener.onFailure(

Review Comment:
   Can this use `invokeFailureListener`? It would be nice to run them all 
through there to limit verbosity.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -167,14 +167,17 @@ public MSQWorkerTaskLauncher(
     this.exec = Execs.singleThreaded(
         "multi-stage-query-task-launcher[" + 
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
     );
-    this.workerFailureListener = workerFailureListener;
     this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
     this.config = config;
   }
 
   @Override
-  public ListenableFuture<?> start()
+  public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
   {
+    if (!this.workerFailureListenerRef.compareAndSet(null, 
workerFailureListener)) {
+      log.warn("WorkerFailureListener already set. Ignoring re-registration 
call");

Review Comment:
   Is this expected to ever happen? Seems sketchy to me and I'm wondering if 
this should be a defensive exception rather than a warning.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2368,7 +2383,9 @@ private void retryFailedTasks() throws 
InterruptedException
       }
 
       // wait till the workers identified above are fully ready
-      workerManager.waitForWorkers(workersNeedToBeFullyStarted);
+      workerManager.waitForWorkers(

Review Comment:
   no reason to reformat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to