markap14 commented on code in PR #11164: URL: https://github.com/apache/nifi/pull/11164#discussion_r3131296534
########## nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgent.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.tasks.ConnectableTask; +import org.apache.nifi.controller.tasks.InvocationResult; +import org.apache.nifi.controller.tasks.ReportingTaskWrapper; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.support.CronExpression; + +import java.time.OffsetDateTime; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Scheduling agent that runs processors, funnels, ports, and reporting tasks on virtual + * threads rather than on a shared thread pool. A single {@link DynamicSemaphore} bounds + * the number of invocations that may run concurrently across the entire flow. Its permit + * count replaces the old Timer-Driven thread pool size and is adjustable at runtime via + * {@link #setMaxThreadCount(int)}. + * <p> + * The agent is used for both {@link SchedulingStrategy#TIMER_DRIVEN} and + * {@link SchedulingStrategy#CRON_DRIVEN} connectables. Each scheduled connectable is given + * one virtual thread per concurrent task; every iteration of the scheduling loop acquires + * a permit, calls {@link ConnectableTask#invoke()}, releases the permit, and then sleeps + * until the next invocation is due. + */ +public class VirtualThreadSchedulingAgent implements SchedulingAgent { + + private static final Logger logger = LoggerFactory.getLogger(VirtualThreadSchedulingAgent.class); + + /** + * Sleeps in the scheduling loop are broken up into chunks of at most this long so that + * a processor that is unscheduled while a scheduling thread is sleeping will exit the + * sleep and observe the state change quickly rather than having to wait for the full + * scheduling period to elapse. + */ + private static final long POLL_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(25L); + + private final FlowController flowController; + private final RepositoryContextFactory contextFactory; + private final DynamicSemaphore globalSemaphore; + private final long noWorkYieldNanos; + private volatile String adminYieldDuration = "1 sec"; + private volatile long adminYieldNanos = TimeUnit.SECONDS.toNanos(1L); + + public VirtualThreadSchedulingAgent(final FlowController flowController, final RepositoryContextFactory contextFactory, + final NiFiProperties nifiProperties, final int maxThreadCount) { + this.flowController = flowController; + this.contextFactory = contextFactory; + this.globalSemaphore = new DynamicSemaphore(maxThreadCount); + + final String boredYieldDuration = nifiProperties.getBoredYieldDuration(); + try { + noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS); + } catch (final IllegalArgumentException e) { + throw new RuntimeException("Failed to create VirtualThreadSchedulingAgent because the " + + NiFiProperties.BORED_YIELD_DURATION + " property is set to an invalid time duration: " + boredYieldDuration); + } + } + + @Override + public void shutdown() { + } + + /** + * Schedules the given connectable to run on virtual threads. This method transitions the + * {@link LifecycleState} to scheduled and then spawns one virtual thread per concurrent task. Each scheduling loop + * captures {@link LifecycleState#getLastStopTime()} at startup and will exit as soon as either + * {@link LifecycleState#isScheduled()} becomes {@code false} or the last-stop-time advances, which is how the + * agent guards against a rapid stop/start cycle leaving prior loops running alongside newly-spawned ones. The + * corresponding {@link #unschedule(Connectable, LifecycleState)} call performs the inverse transition. Calling + * {@code setScheduled(true)} here is intentionally idempotent: the framework scheduler also marks the state as + * scheduled in some code paths (for example, reporting-task startup), and double-calling is harmless because the + * last-stop-time only advances on transitions to the stopped state. + */ + @Override + public void schedule(final Connectable connectable, final LifecycleState lifecycleState) { + final CronExpression cronExpression; + if (connectable.getSchedulingStrategy() == SchedulingStrategy.CRON_DRIVEN) { + final String cronSchedule = connectable.evaluateParameters(connectable.getSchedulingPeriod()); + cronExpression = parseCronExpression(cronSchedule, connectable); + } else { + cronExpression = null; + } + + lifecycleState.setScheduled(true); + final long startStopTime = lifecycleState.getLastStopTime(); + final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, lifecycleState); + final int taskCount = connectable.getMaxConcurrentTasks(); + + for (int i = 0; i < taskCount; i++) { + final String threadName = buildThreadName(connectable, i); + Thread.ofVirtual().name(threadName).start(() -> runSchedulingLoop(connectable, connectableTask, lifecycleState, startStopTime, cronExpression)); + } + + logger.info("Scheduled {} to run with {} virtual threads", connectable, taskCount); + } + + @Override + public void scheduleOnce(final Connectable connectable, final LifecycleState lifecycleState, final Callable<Future<Void>> stopCallback) { + lifecycleState.setScheduled(true); + final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, lifecycleState); + final String threadName = buildThreadName(connectable, 0); + + Thread.ofVirtual().name(threadName).start(() -> runOnce(connectable, connectableTask, stopCallback)); + } + + @Override + public void unschedule(final Connectable connectable, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(false); + logger.info("Stopped scheduling {} to run", connectable); + } + + @Override + public void schedule(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { + final boolean cronDriven = taskNode.getSchedulingStrategy() == SchedulingStrategy.CRON_DRIVEN; + final CronExpression cronExpression; + final long schedulingNanos; + if (cronDriven) { + cronExpression = parseCronExpression(taskNode.getSchedulingPeriod(), taskNode); + schedulingNanos = 0L; + } else { + cronExpression = null; + schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS); + } + + lifecycleState.setScheduled(true); + final long startStopTime = lifecycleState.getLastStopTime(); + final Runnable reportingTaskWrapper = new ReportingTaskWrapper(taskNode, lifecycleState, flowController.getExtensionManager()); + final String threadName = "Reporting Task: " + taskNode.getName(); + + Thread.ofVirtual().name(threadName).start(() -> runReportingTaskLoop(taskNode, reportingTaskWrapper, schedulingNanos, cronExpression, lifecycleState, startStopTime)); + + logger.info("{} started on virtual thread", taskNode.getReportingTask()); + } + + @Override + public void unschedule(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(false); + logger.info("Stopped scheduling {} to run", taskNode.getReportingTask()); + } + + /** + * @return {@code true} if {@code lifecycleState} is still scheduled and its last-stop-time has not changed since + * the scheduling loop was spawned (meaning the loop is still running against its original scheduling generation). + * Used by the scheduling loops and their polling helpers to detect both the normal stop path (isScheduled flips to + * false) and the rapid stop/start race (a stop increments lastStopTime even if a quick restart flips isScheduled + * back to true before the old loop has observed the stop). + */ + private boolean isActive(final LifecycleState lifecycleState, final long startStopTime) { + return lifecycleState.isScheduled() && lifecycleState.getLastStopTime() == startStopTime; + } + + private static CronExpression parseCronExpression(final String cronSchedule, final Object component) { + try { + return CronExpression.parse(cronSchedule); + } catch (final RuntimeException e) { + throw new IllegalStateException("Cannot schedule " + component + " to run because its scheduling period is not a valid CRON expression: " + cronSchedule, e); + } + } + + @Override + public void onEvent(final Connectable connectable) { + } + + @Override + public void setMaxThreadCount(final int maxThreads) { + globalSemaphore.setMaxPermits(maxThreads); + logger.info("Global semaphore permits updated to {}", maxThreads); + } + + @Override + public synchronized void incrementMaxThreadCount(final int toAdd) { + if (toAdd == 0) { + return; + } + + final int currentMax = globalSemaphore.getMaxPermits(); + final int newMax = currentMax + toAdd; + if (newMax < 1) { + throw new IllegalStateException("Cannot remove " + (-toAdd) + " permits from global semaphore because there are only " + currentMax + " permits available"); + } + + globalSemaphore.setMaxPermits(newMax); + } + + @Override + public void setAdministrativeYieldDuration(final String duration) { + this.adminYieldNanos = FormatUtils.getTimeDuration(duration, TimeUnit.NANOSECONDS); + this.adminYieldDuration = duration; + } + + @Override + public String getAdministrativeYieldDuration() { + return adminYieldDuration; + } + + @Override + public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { + return timeUnit.convert(adminYieldNanos, TimeUnit.NANOSECONDS); + } + + DynamicSemaphore getGlobalSemaphore() { + return globalSemaphore; + } + + /** + * @return the number of virtual threads that are currently executing a processor or + * reporting-task invocation. A thread counts as active when it is holding a permit on + * the global semaphore, which mirrors the old Timer-Driven engine's active-count + * semantics used by the cluster heartbeat and UI active-thread counter. + */ + public int getActiveThreadCount() { + return globalSemaphore.getMaxPermits() - globalSemaphore.availablePermits(); + } + + /** + * Runs the scheduling loop for a {@link Connectable}. Each iteration acquires a permit from the global semaphore, + * invokes the connectable, releases the permit, and sleeps until the next invocation is due. The entire body of + * the loop is wrapped in a {@code try/catch(Throwable)} so that no exception or error -- including + * {@link Error} subclasses or bugs in the scheduling logic itself -- can cause the virtual thread to terminate + * silently. A processor is expected to continue being triggered as long as it is scheduled, so on any unexpected + * {@link Throwable} the error is logged, an administrative yield is applied to prevent tight-looping on a broken + * task, and the loop continues on its next iteration. + */ + private void runSchedulingLoop(final Connectable connectable, final ConnectableTask connectableTask, + final LifecycleState lifecycleState, final long startStopTime, final CronExpression cronExpression) { + + final boolean cronDriven = cronExpression != null; + + OffsetDateTime nextCronSchedule = null; + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(OffsetDateTime.now(), cronExpression); + final long initialDelayMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + if (initialDelayMillis > 0L) { + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(initialDelayMillis), lifecycleState, startStopTime); + } + } + + while (isActive(lifecycleState, startStopTime)) { + try { + if (!acquirePermitWithPolling(lifecycleState, startStopTime)) { + return; + } + + final InvocationResult invocationResult; + try { + invocationResult = connectableTask.invoke(); + } finally { + globalSemaphore.release(); + } + + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(nextCronSchedule, cronExpression); + final long sleepMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(sleepMillis), lifecycleState, startStopTime); + } else { + sleepForSchedulingPeriod(connectable, lifecycleState, startStopTime, invocationResult); + } + } catch (final Throwable t) { + // Nothing in the loop body is expected to throw (ConnectableTask.invoke() catches Throwable itself, and + // acquirePermitWithPolling handles InterruptedException). If anything does escape to here, it must not + // be allowed to kill the scheduling virtual thread: as long as the component remains scheduled we will + // keep triggering it. Log the error, apply an administrative yield to avoid tight-looping on a broken + // invocation, and continue with the next iteration. + try { + connectable.yield(adminYieldNanos, TimeUnit.NANOSECONDS); + } catch (final Throwable yieldError) { + t.addSuppressed(yieldError); + } + + logger.error("Unexpected error in scheduling loop for {}. Will yield for {} and continue.", connectable, adminYieldDuration, t); + } + } + } + + private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable<Future<Void>> stopCallback) { + try { + try { + globalSemaphore.acquire(); Review Comment: Use of `.acquire` is fine here because there's no way to explicitly stop a Processor when it's triggered with RunOnce anyway so doing it with polling doesn't buy us anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
