LakshSingla commented on code in PR #16790: URL: https://github.com/apache/druid/pull/16790#discussion_r1695064626
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java: ########## @@ -54,11 +54,12 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase) @Override public boolean canTransitionFrom(final WorkerStagePhase priorPhase) { - return priorPhase == PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES; + return priorPhase == PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES /* if globally sorting */ + || priorPhase == READING_INPUT /* if locally sorting */; Review Comment: curious why this never got caught before this patch? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java: ########## @@ -70,7 +71,7 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase) @Override public boolean canTransitionFrom(final WorkerStagePhase priorPhase) { - return priorPhase == RESULTS_READY; + return priorPhase.compareTo(FINISHED) < 0; Review Comment: should any stage be allowed to transition to finished, even if it hasn't produced any results? ########## extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java: ########## @@ -138,12 +133,11 @@ public ListenableFuture<Boolean> fetchChannelData( catch (Exception e) { throw new ISE(e, "Error reading frame file channel"); } - } @Override public void close() { - inMemoryWorkers.forEach((k, v) -> v.stopGracefully()); + inMemoryWorkers.forEach((k, v) -> v.stop()); Review Comment: Anything which prompted this change? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java: ########## @@ -273,203 +189,112 @@ public TaskStatus run() throws Exception if (maybeErrorReport.isPresent()) { final MSQErrorReport errorReport = maybeErrorReport.get(); - final String errorLogMessage = MSQTasks.errorReportToLogMessage(errorReport); - log.warn(errorLogMessage); + final String logMessage = MSQTasks.errorReportToLogMessage(errorReport); + log.warn("%s", logMessage); - closer.register(() -> { - if (controllerAlive && controllerClient != null && selfDruidNode != null) { - controllerClient.postWorkerError(id(), errorReport); - } - }); + if (controllerAlive) { + controllerClient.postWorkerError(context.workerId(), errorReport); + } - return TaskStatus.failure(id(), MSQFaultUtils.generateMessageWithErrorCode(errorReport.getFault())); - } else { - return TaskStatus.success(id()); + if (t != null) { + Throwables.throwIfInstanceOf(t, MSQException.class); + throw new MSQException(t, maybeErrorReport.get().getFault()); + } else { + throw new MSQException(maybeErrorReport.get().getFault()); + } } } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + runFuture.set(null); + } } /** * Runs worker logic. Returns an empty Optional on success. On failure, returns an error report for errors that * happened in other threads; throws exceptions for errors that happened in the main worker loop. */ - public Optional<MSQErrorReport> runTask(final Closer closer) throws Exception + private Optional<MSQErrorReport> runInternal(final KernelHolders kernelHolders, final Closer workerCloser) + throws Exception { - this.controllerClient = context.makeControllerClient(task.getControllerTaskId()); - closer.register(controllerClient::close); - closer.register(context.dataServerQueryHandlerFactory()); - context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized - - this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient()); - closer.register(workerClient::close); - - final KernelHolder kernelHolder = new KernelHolder(); - final String cancellationId = id(); - + context.registerWorker(this, workerCloser); + workerCloser.register(context.dataServerQueryHandlerFactory()); + this.workerClient = workerCloser.register(new ExceptionWrappingWorkerClient(context.makeWorkerClient())); final FrameProcessorExecutor workerExec = new FrameProcessorExecutor(makeProcessingPool()); - // Delete all the stage outputs - closer.register(() -> { - for (final StageId stageId : stageOutputs.keySet()) { - cleanStageOutput(stageId, false); - } - }); - - // Close stage output processors and running futures (if present) - closer.register(() -> { - try { - workerExec.cancel(cancellationId); - } - catch (InterruptedException e) { - // Strange that cancellation would itself be interrupted. Throw an exception, since this is unexpected. - throw new RuntimeException(e); - } - }); + final long maxAllowedParseExceptions; - long maxAllowedParseExceptions = Long.parseLong(task.getContext().getOrDefault( - MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, - Long.MAX_VALUE - ).toString()); + if (task != null) { + maxAllowedParseExceptions = + Long.parseLong(task.getContext() + .getOrDefault(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, Long.MAX_VALUE) + .toString()); + } else { + maxAllowedParseExceptions = 0; + } - long maxVerboseParseExceptions; + final long maxVerboseParseExceptions; if (maxAllowedParseExceptions == -1L) { maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS; } else { maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions, Limits.MAX_VERBOSE_PARSE_EXCEPTIONS); } - Set<String> criticalWarningCodes; + final Set<String> criticalWarningCodes; if (maxAllowedParseExceptions == 0) { criticalWarningCodes = ImmutableSet.of(CannotParseExternalDataFault.CODE); } else { criticalWarningCodes = ImmutableSet.of(); } - final MSQWarningReportPublisher msqWarningReportPublisher = new MSQWarningReportLimiterPublisher( - new MSQWarningReportSimplePublisher( - id(), - controllerClient, - id(), - MSQTasks.getHostFromSelfNode(selfDruidNode) - ), - Limits.MAX_VERBOSE_WARNINGS, - ImmutableMap.of(CannotParseExternalDataFault.CODE, maxVerboseParseExceptions), - criticalWarningCodes, - controllerClient, - id(), - MSQTasks.getHostFromSelfNode(selfDruidNode) - ); - - closer.register(msqWarningReportPublisher); + // Delay removal of kernels so we don't interfere with iteration of kernelHolders.getAllKernelHolders(). + final Set<StageId> kernelsToRemove = new HashSet<>(); - final Map<StageId, SettableFuture<ClusterByPartitions>> partitionBoundariesFutureMap = new HashMap<>(); - - final Map<StageId, FrameContext> stageFrameContexts = new HashMap<>(); - - while (!kernelHolder.isDone()) { + while (!kernelHolders.isDone()) { boolean didSomething = false; - for (final WorkerStageKernel kernel : kernelHolder.getStageKernelMap().values()) { + for (final KernelHolder kernelHolder : kernelHolders.getAllKernelHolders()) { + final WorkerStageKernel kernel = kernelHolder.kernel; final StageDefinition stageDefinition = kernel.getStageDefinition(); - if (kernel.getPhase() == WorkerStagePhase.NEW) { - - log.info("Processing work order for stage [%d]" + - (log.isDebugEnabled() - ? StringUtils.format( - " with payload [%s]", - context.jsonMapper().writeValueAsString(kernel.getWorkOrder()) - ) : ""), stageDefinition.getId().getStageNumber()); - - // Create separate inputChannelFactory per stage, because the list of tasks can grow between stages, and - // so we need to avoid the memoization in baseInputChannelFactory. - final InputChannelFactory inputChannelFactory = makeBaseInputChannelFactory(closer); - - // Compute memory parameters for all stages, even ones that haven't been assigned yet, so we can fail-fast - // if some won't work. (We expect that all stages will get assigned to the same pool of workers.) - for (final StageDefinition stageDef : kernel.getWorkOrder().getQueryDefinition().getStageDefinitions()) { - stageFrameContexts.computeIfAbsent( - stageDef.getId(), - stageId -> context.frameContext( - kernel.getWorkOrder().getQueryDefinition(), - stageId.getStageNumber() - ) - ); - } - - // Start working on this stage immediately. - kernel.startReading(); - - final RunWorkOrder runWorkOrder = new RunWorkOrder( - kernel, - inputChannelFactory, - stageCounters.computeIfAbsent(stageDefinition.getId(), ignored -> new CounterTracker()), + // Workers run all work orders they get. There is not (currently) any limit on the number of concurrent work + // orders; we rely on the controller to avoid overloading workers. + if (kernel.getPhase() == WorkerStagePhase.NEW + && kernelHolders.runningKernelCount() < context.maxConcurrentStages()) { Review Comment: Workers in `RESULTS_COMPLETE` running in the `MEMORY` mode would be consuming a decent portion of the memory storing the result output. Does this condition not overload the memory on the worker because at most 1 stage will be not-running but consuming memory (in leap-frogging) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org