gianm commented on code in PR #18254:
URL: https://github.com/apache/druid/pull/18254#discussion_r2216970487
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -800,7 +813,7 @@ private void addToRetryQueue(ControllerQueryKernel kernel,
int worker, MSQFault
List<WorkOrder> retriableWorkOrders =
kernel.getWorkInCaseWorkerEligibleForRetryElseThrow(worker, fault);
if (!retriableWorkOrders.isEmpty()) {
- log.info("Submitting worker[%s] for relaunch because of fault[%s]",
worker, fault);
+ log.debug("Submitting worker[%s] for relaunch because of fault[%s]",
worker, fault);
Review Comment:
Is this intentional in order to reduce log spam?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -301,14 +318,31 @@ public void reportFailedInactiveWorker(int workerNumber)
}
@Override
- public void waitForWorkers(Set<Integer> workerNumbers) throws
InterruptedException
+ public void waitForWorkers(Set<Integer> workerNumbers)
+ throws InterruptedException
{
synchronized (taskIds) {
while (!fullyStartedTasks.containsAll(workerNumbers)) {
if (stopFuture.isDone() || stopFuture.isCancelled()) {
FutureUtils.getUnchecked(stopFuture, false);
throw new ISE("Stopped");
}
+
+ WorkerFailureListener workerFailureListener =
workerFailureListenerRef.get();
+ if (workerFailureListener != null) {
+ for (TaskTracker taskTracker : taskTrackers.values()) {
+ if (taskTracker.isRetrying() &&
workerNumbers.contains(taskTracker.workerNumber)) {
+ workerFailureListener.onFailure(
Review Comment:
Can this use `invokeFailureListener`?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2547,7 +2564,9 @@ private void startStages() throws IOException,
InterruptedException
stageDef.doesShuffle() ? stageDef.getShuffleSpec().kind() :
"none"
);
- workerManager.launchWorkersIfNeeded(workerCount);
+ workerManager.launchWorkersIfNeeded(
Review Comment:
no reason to reformat
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1379,13 +1392,15 @@ private void contactWorkersForStage(
final boolean retryOnFailure
)
{
- // Sorted copy of target worker numbers to ensure consistent iteration
order.
+ // Sorted copy of target worker numbers to ensure a consistent iteration
order.
final List<Integer> workersCopy = Ordering.natural().sortedCopy(workers);
final List<String> workerIds = getWorkerIds();
final List<ListenableFuture<Void>> workerFutures = new
ArrayList<>(workersCopy.size());
try {
- workerManager.waitForWorkers(workers);
+ workerManager.waitForWorkers(
Review Comment:
no reason to reformat
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -244,22 +246,37 @@ public List<String> getWorkerIds()
}
@Override
- public void launchWorkersIfNeeded(final int taskCount) throws
InterruptedException
+ public void launchWorkersIfNeeded(final int workerCount)
+ throws InterruptedException
{
synchronized (taskIds) {
- retryInactiveTasksIfNeeded(taskCount);
+ retryInactiveTasksIfNeeded(workerCount);
- if (taskCount > desiredTaskCount) {
- desiredTaskCount = taskCount;
+ if (workerCount > desiredTaskCount) {
+ desiredTaskCount = workerCount;
taskIds.notifyAll();
}
- while (taskIds.size() < taskCount || !allTasksStarted(taskCount)) {
+ while (taskIds.size() < workerCount || !allTasksStarted(workerCount)) {
if (stopFuture.isDone() || stopFuture.isCancelled()) {
FutureUtils.getUnchecked(stopFuture, false);
throw new ISE("Stopped");
}
-
+ // add failed tasks to retry the queue
+ WorkerFailureListener workerFailureListener =
workerFailureListenerRef.get();
+ if (workerFailureListener != null) {
+ for (TaskTracker taskTracker : taskTrackers.values()) {
+ if (taskTracker.isRetrying()) {
+ workerFailureListener.onFailure(
Review Comment:
Can this use `invokeFailureListener`? It would be nice to run them all
through there to limit verbosity.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -167,14 +167,17 @@ public MSQWorkerTaskLauncher(
this.exec = Execs.singleThreaded(
"multi-stage-query-task-launcher[" +
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
);
- this.workerFailureListener = workerFailureListener;
this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
this.config = config;
}
@Override
- public ListenableFuture<?> start()
+ public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
{
+ if (!this.workerFailureListenerRef.compareAndSet(null,
workerFailureListener)) {
+ log.warn("WorkerFailureListener already set. Ignoring re-registration
call");
Review Comment:
Is this expected to ever happen? Seems sketchy to me and I'm wondering if
this should be a defensive exception rather than a warning.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2368,7 +2383,9 @@ private void retryFailedTasks() throws
InterruptedException
}
// wait till the workers identified above are fully ready
- workerManager.waitForWorkers(workersNeedToBeFullyStarted);
+ workerManager.waitForWorkers(
Review Comment:
no reason to reformat
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]