cryptoe commented on code in PR #18254:
URL: https://github.com/apache/druid/pull/18254#discussion_r2217145581


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -167,14 +167,17 @@ public MSQWorkerTaskLauncher(
     this.exec = Execs.singleThreaded(
         "multi-stage-query-task-launcher[" + 
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
     );
-    this.workerFailureListener = workerFailureListener;
     this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
     this.config = config;
   }
 
   @Override
-  public ListenableFuture<?> start()
+  public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
   {
+    if (!this.workerFailureListenerRef.compareAndSet(null, 
workerFailureListener)) {
+      log.warn("WorkerFailureListener already set. Ignoring re-registration 
call");

Review Comment:
   This is never expected to happen. Will adjust accordingly. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -800,7 +813,7 @@ private void addToRetryQueue(ControllerQueryKernel kernel, 
int worker, MSQFault
 
     List<WorkOrder> retriableWorkOrders = 
kernel.getWorkInCaseWorkerEligibleForRetryElseThrow(worker, fault);
     if (!retriableWorkOrders.isEmpty()) {
-      log.info("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);
+      log.debug("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);

Review Comment:
   Yes. Now we try to retry more often, this was causing spam. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -301,14 +318,31 @@ public void reportFailedInactiveWorker(int workerNumber)
   }
 
   @Override
-  public void waitForWorkers(Set<Integer> workerNumbers) throws 
InterruptedException
+  public void waitForWorkers(Set<Integer> workerNumbers)
+      throws InterruptedException
   {
     synchronized (taskIds) {
       while (!fullyStartedTasks.containsAll(workerNumbers)) {
         if (stopFuture.isDone() || stopFuture.isCancelled()) {
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
+
+        WorkerFailureListener workerFailureListener = 
workerFailureListenerRef.get();
+        if (workerFailureListener != null) {
+          for (TaskTracker taskTracker : taskTrackers.values()) {
+            if (taskTracker.isRetrying() && 
workerNumbers.contains(taskTracker.workerNumber)) {
+              workerFailureListener.onFailure(

Review Comment:
   I wanted to use the same workerFailureListener for all iterations but since 
now we are throwing an exception in the start method which mandates only having 
one failureListener, yes we can use the invokeFailureListener. Adjusted the 
code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to