Copilot commented on code in PR #2555: URL: https://github.com/apache/groovy/pull/2555#discussion_r3311939401
########## src/main/java/groovy/concurrent/ActorContext.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent; + +import java.time.Duration; + +/** + * Context handle passed to context-aware actor handlers. Provides access + * to the executing actor itself and to the per-dispatch capabilities + * needed to express FSM-style actors: behavior swaps via + * {@link #become(ReactorHandler) become(...)}, message deferral via + * {@link #stash()} / {@link #unstashAll()}, and timed self-sends via + * {@link #scheduleOnce(Object, Duration) scheduleOnce(...)} / + * {@link #scheduleAtFixedRate(Object, Duration, Duration) scheduleAtFixedRate(...)}. + * <p> + * A context is scoped to a single actor and is only valid for use during + * a handler invocation (including a context-aware {@code onError} + * callback). Calls to its mutating methods from outside that window — + * for example from a captured reference invoked on another thread — + * throw {@link IllegalStateException}. + * + * @param <T> the actor's message type + * @see Actor + * @since 6.0.0 + */ +public interface ActorContext<T> { + + /** + * Returns the actor whose handler is currently executing. + */ + Actor<T> self(); + + /** + * Replaces the handler used to process subsequent messages on a + * reactor actor. The swap takes effect on the next message — the + * current handler invocation completes normally, including binding + * any {@code sendAndGet} reply and firing {@code onError} on failure. + * <p> + * The new handler may declare a different reply type than the original; + * {@code sendAndGet} callers receive the new handler's return value. + * <p> + * Messages already queued at the moment of the swap, and messages sent + * by other threads that have not yet observed the swap, are dispatched + * to the new handler. The new handler is responsible for tolerating any + * message its predecessor could have received — typically by including + * a default branch that ignores or rejects unexpected messages, or by + * deferring them with {@link #stash()} for later replay. + * + * @param newHandler the replacement reactor handler + * @param <R> the new reactor's reply type + * @throws UnsupportedOperationException if this actor is stateful, or + * if this {@code ActorContext} implementation does not support + * behavior swaps + * @throws IllegalStateException if called outside a handler dispatch + * or from a thread other than the actor's worker thread + * @throws NullPointerException if {@code newHandler} is null + * @since 6.0.0 + */ + default <R> void become(ReactorHandler<T, R> newHandler) { + throw new UnsupportedOperationException( + "become(ReactorHandler) requires a reactor actor"); + } + + /** + * Replaces the handler used to process subsequent messages on a + * stateful actor. The current state value is preserved verbatim and + * passed unchanged to the new handler. The swap takes effect on the + * next message. + * <p> + * The state type {@code S} is unchecked at the swap site: if the new + * handler's expected state type is incompatible with the actor's + * current state, a {@link ClassCastException} is thrown when the next + * message is dispatched. + * + * @param newHandler the replacement stateful handler + * @param <S> the state type expected by the new handler + * @throws UnsupportedOperationException if this actor is a reactor, or + * if this {@code ActorContext} implementation does not support + * behavior swaps + * @throws IllegalStateException if called outside a handler dispatch + * or from a thread other than the actor's worker thread + * @throws NullPointerException if {@code newHandler} is null + * @since 6.0.0 + */ + default <S> void become(StatefulHandler<S, T> newHandler) { + throw new UnsupportedOperationException( + "become(StatefulHandler) requires a stateful actor"); + } + + /** + * Defers the message currently being processed. The message is moved + * out of the dispatch path: any {@code sendAndGet} reply remains + * unbound, any state change computed by the current handler is + * discarded, and the message is re-delivered when {@link #unstashAll()} + * is later called. + * <p> + * Calling {@code stash()} more than once during a single handler + * invocation is idempotent — the message is stashed once. If the + * handler subsequently throws, the stash is rolled back and the failure + * is reported normally (reply bound to error, {@code onError} fires). + * A context-aware {@code onError} callback may itself call + * {@code stash()} to defer the failed message for later retry. + * <p> + * Stashed messages do <em>not</em> count against the configured + * mailbox bound — the bound applies to the queue of pending sends, + * not to messages held in the stash. + * <p> + * <b>Warning — the stash buffer is unbounded by default.</b> An + * actor that stashes messages from a source whose volume you do not + * control (network input, external clients, untrusted callers) can + * grow the stash without limit and exhaust the JVM heap if the + * phase transition that would call {@link #unstashAll()} never + * arrives. For any such actor, configure a bound and overflow + * policy at construction time via + * {@link ActorOptions#withStashBound(int, ActorOptions.StashOverflow)}. + * The three policies are {@code FAIL} (this method throws), + * {@code DROP_OLDEST} (evicts the oldest stashed message, binding + * its reply to {@link IllegalStateException}), and {@code REJECT} + * (binds the current message's reply to {@link IllegalStateException} + * and does not stash it). + * <p> + * If {@link Actor#stop()} is invoked while messages are stashed, the + * stashed messages are rejected: any {@code sendAndGet} reply is bound + * to an {@link IllegalStateException} and fire-and-forget stashed + * messages are discarded. + * + * @throws IllegalStateException if called outside a handler dispatch + * or from a thread other than the actor's worker thread + * @throws UnsupportedOperationException if this {@code ActorContext} + * implementation does not support stash + * @since 6.0.0 + */ + default void stash() { + throw new UnsupportedOperationException( + "This ActorContext implementation does not support stash"); + } + + /** + * Re-injects all stashed messages at the head of the mailbox in the + * order they were originally stashed (FIFO). Subsequent dispatches + * will see the unstashed messages before any messages that other + * senders have queued in the meantime. + * <p> + * No-op if no messages are stashed. + * + * @throws IllegalStateException if called outside a handler dispatch + * or from a thread other than the actor's worker thread + * @throws UnsupportedOperationException if this {@code ActorContext} + * implementation does not support stash + * @since 6.0.0 + */ + default void unstashAll() { + throw new UnsupportedOperationException( + "This ActorContext implementation does not support unstashAll"); + } + + /** + * Schedules a one-shot self-send: the given message is delivered to + * this actor after the given delay, via the same dispatch path as + * {@link Actor#send(Object)} (mailbox bound respected, + * {@code onError} fires on handler failure, etc.). + * <p> + * The returned {@link Cancellable} can be used to call off the send + * before it fires. On {@link Actor#stop()}, all outstanding scheduled + * timers (one-shot and recurring) created via this context are + * cancelled automatically. + * <p> + * Timer firings race with the actor's lifecycle: a send that + * arrives after the actor has stopped is silently dropped (the + * implementation catches the {@link IllegalStateException} that + * {@code send} would throw). Combining a bounded mailbox using + * {@link ActorOptions.Overflow#BLOCK} with timers is discouraged — + * a full mailbox will block the shared scheduler thread; prefer + * {@link ActorOptions.Overflow#FAIL FAIL} or + * {@link ActorOptions.Overflow#DROP_NEWEST DROP_NEWEST} for Review Comment: The Javadoc warns that combining timers with a bounded BLOCK mailbox will "block the shared scheduler thread", but `DefaultActor` now explicitly offloads the timer’s `send` onto `AsyncSupport.getExecutor()` to avoid blocking the scheduler thread. The warning should be updated to match the actual behavior (the scheduler thread won’t block, though executor threads may). ########## src/spec/doc/core-concurrent-actors.adoc: ########## @@ -219,19 +219,98 @@ assert balance == 70.0 account.stop() ---- +[[actors-options]] +== Construction options + +Actors take an optional `ActorOptions` configuring the mailbox, the +executor, and a handful of opt-in behaviours. The builder is +value-based; each `with*` method returns a new configuration. + +[source,groovy] +---- +import groovy.concurrent.Actor +import groovy.concurrent.ActorOptions + +def options = ActorOptions.DEFAULTS + .withBoundedMailbox(1000, ActorOptions.Overflow.BLOCK) + +def actor = Actor.reactor(handler, options) +---- + +=== Bounded mailbox + +By default the mailbox is unbounded. For backpressure — or to cap +memory when producers can outrun the actor — configure a capacity and +an overflow policy: + +[source,groovy] +---- +import groovy.concurrent.ActorOptions.Overflow + +// BLOCK — sender blocks until capacity is free +ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.BLOCK) + +// FAIL — send throws IllegalStateException when the mailbox is full +ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.FAIL) + +// DROP_NEWEST — the incoming message is silently dropped; for +// sendAndGet, the returned Awaitable binds to IllegalStateException +ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.DROP_NEWEST) +---- + +A handler that calls `ctx.self().send(...)` on a full `BLOCK` mailbox +would deadlock (the handler is the actor's only consumer). The actor +detects this and fails fast with `IllegalStateException` rather than +parking the worker thread. + +=== Per-actor executor + +Actors default to a shared async executor (virtual threads on +JDK 21+). For workload isolation, hand the actor its own executor: + +[source,groovy] +---- +import java.util.concurrent.Executors + +def pool = Executors.newSingleThreadExecutor() +def actor = Actor.reactor(handler, + ActorOptions.DEFAULTS.withExecutor(pool)) +---- + +Other options — `withStashBound` and `withCurrentSelf` — are covered +in the <<actors-fsm,FSM section>> where they're directly used. + [[actors-lifecycle]] == Lifecycle -Both actors and agents support lifecycle management: +Both actors and agents support lifecycle management. An actor has a +three-state lifecycle: + +[cols="1,1,3"] +|=== +|`isActive()` |`isTerminated()` |Meaning + +|`true` |`false` |Accepting new sends and processing them. +|`false` |`false` |Draining — `stop()` was called, no new sends are +accepted, but already-queued messages are still being processed. +|`false` |`true` |Terminated — the worker thread has exited; all +queued messages are processed and any stashed `sendAndGet` replies +have been rejected. +|=== [source,groovy] ---- def actor = Actor.reactor { it } assert actor.isActive() +assert !actor.isTerminated() + +actor.stop() // flips isActive() to false immediately +assert !actor.isActive() // refuses new sends from this point on -actor.stop() // graceful: processes remaining messages then exits -Thread.sleep(50) -assert !actor.isActive() +// The worker may still be draining queued messages. Poll for terminated +// when you actually need to be sure the actor has finished shutting down. +while (!actor.isTerminated()) Thread.sleep(10) Review Comment: The lifecycle example states that after `actor.stop()` the actor "refuses new sends from this point on". In `DefaultActor`, sends that race with `stop()` can still be enqueued (it explicitly avoids treating POISON as an unconditional terminator to honor race-late sends). Consider softening this wording to acknowledge that sends already in flight may still land, while sends that observe `isActive()==false` will fail. ########## src/main/java/org/apache/groovy/runtime/async/DefaultActor.java: ########## @@ -89,79 +190,610 @@ public boolean isActive() { } @Override - @SuppressWarnings("unchecked") + public boolean isTerminated() { + return terminated; + } + + @Override public void stop() { if (!active) return; active = false; - // Poison pill signals the processing loop to exit after draining - queue.add(new Envelope<>(POISON, null)); + // Cancel any outstanding scheduled timers so they do not fire + // into a stopped actor (one-shots would get IllegalStateException + // from send and be silently caught, but periodic timers would + // keep firing forever holding refs to this actor). + for (ScheduledFuture<?> f : pendingTimers) { + f.cancel(false); + } + pendingTimers.clear(); + // Best-effort wake-up. The deque is internally unbounded + // (capacity is gated by the semaphore), so offerLast always + // succeeds — but the POISON itself is just a marker; the + // post-message drain check below is what actually terminates + // the loop. + queue.offerLast(new Envelope<>(POISON, null)); } - // ---- Internal ------------------------------------------------------- + @Override + public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg); + return this; + } - @SuppressWarnings("unchecked") - private void processLoop() { - while (true) { - try { - Envelope<T> envelope = queue.take(); - if (envelope.message == POISON) return; + @Override + public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = handler; + return this; + } - try { - Object result = processor.process((T) envelope.message); - if (envelope.reply != null) { - envelope.reply.bind(result); + // ---- Send-side ----------------------------------------------------- + + /** + * Routes an envelope through the configured mailbox policy. A permit + * is acquired here at {@code send} time and released by the worker + * the moment it dequeues the envelope, so the bound applies to the + * size of the queue rather than to in-flight or stashed messages. + */ + private void enqueue(Envelope<T> envelope) { + if (capacity == null) { + queue.addLast(envelope); + return; + } + switch (options.overflow()) { + case BLOCK -> { + // A handler that sends to its own actor on a full BLOCK + // mailbox would deadlock: it is the only consumer, and + // acquire() would park it indefinitely waiting for + // capacity it can never free. The worker-thread branch + // uses tryAcquire so an external sender can't steal the + // last permit between an availablePermits() check and + // the acquire() (a multi-sender TOCTOU that would + // otherwise re-introduce the same deadlock). + if (Thread.currentThread() == workerThread) { + if (!capacity.tryAcquire()) { + throw new IllegalStateException( + "send from the actor's own handler would deadlock " + + "the bounded BLOCK mailbox (capacity " + + options.mailboxCapacity() + ")"); + } + } else { + try { + capacity.acquire(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sending to actor", ie); } - } catch (Throwable t) { + } Review Comment: In the BLOCK overflow path, a sender can block in `capacity.acquire()` and later enqueue after `stop()` has already flipped `active=false` (and even after the worker exits), because there is no post-acquire `active` check before `queue.addLast(envelope)`. That can leave messages/replies unprocessed (e.g., `sendAndGet` waits forever) if the actor terminates just before the blocked sender enqueues. Consider re-checking `active` after acquiring the permit (and releasing the permit + failing/throwing if inactive) so blocked senders don’t enqueue into a stopped/terminated actor. ########## src/main/java/org/apache/groovy/runtime/async/DefaultActor.java: ########## @@ -89,79 +190,610 @@ public boolean isActive() { } @Override - @SuppressWarnings("unchecked") + public boolean isTerminated() { + return terminated; + } + + @Override public void stop() { if (!active) return; active = false; - // Poison pill signals the processing loop to exit after draining - queue.add(new Envelope<>(POISON, null)); + // Cancel any outstanding scheduled timers so they do not fire + // into a stopped actor (one-shots would get IllegalStateException + // from send and be silently caught, but periodic timers would + // keep firing forever holding refs to this actor). + for (ScheduledFuture<?> f : pendingTimers) { + f.cancel(false); + } + pendingTimers.clear(); + // Best-effort wake-up. The deque is internally unbounded + // (capacity is gated by the semaphore), so offerLast always + // succeeds — but the POISON itself is just a marker; the + // post-message drain check below is what actually terminates + // the loop. + queue.offerLast(new Envelope<>(POISON, null)); } - // ---- Internal ------------------------------------------------------- + @Override + public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg); + return this; + } - @SuppressWarnings("unchecked") - private void processLoop() { - while (true) { - try { - Envelope<T> envelope = queue.take(); - if (envelope.message == POISON) return; + @Override + public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = handler; + return this; + } - try { - Object result = processor.process((T) envelope.message); - if (envelope.reply != null) { - envelope.reply.bind(result); + // ---- Send-side ----------------------------------------------------- + + /** + * Routes an envelope through the configured mailbox policy. A permit + * is acquired here at {@code send} time and released by the worker + * the moment it dequeues the envelope, so the bound applies to the + * size of the queue rather than to in-flight or stashed messages. + */ + private void enqueue(Envelope<T> envelope) { + if (capacity == null) { + queue.addLast(envelope); + return; + } + switch (options.overflow()) { + case BLOCK -> { + // A handler that sends to its own actor on a full BLOCK + // mailbox would deadlock: it is the only consumer, and + // acquire() would park it indefinitely waiting for + // capacity it can never free. The worker-thread branch + // uses tryAcquire so an external sender can't steal the + // last permit between an availablePermits() check and + // the acquire() (a multi-sender TOCTOU that would + // otherwise re-introduce the same deadlock). + if (Thread.currentThread() == workerThread) { + if (!capacity.tryAcquire()) { + throw new IllegalStateException( + "send from the actor's own handler would deadlock " + + "the bounded BLOCK mailbox (capacity " + + options.mailboxCapacity() + ")"); + } + } else { + try { + capacity.acquire(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sending to actor", ie); } - } catch (Throwable t) { + } + queue.addLast(envelope); + } + case DROP_NEWEST -> { + if (!capacity.tryAcquire()) { if (envelope.reply != null) { - envelope.reply.bindError(t); + envelope.reply.bindError(new IllegalStateException( + "mailbox full (capacity " + options.mailboxCapacity() + + "); message dropped")); + } + return; + } + queue.addLast(envelope); + } + case FAIL -> { + if (!capacity.tryAcquire()) { + throw new IllegalStateException( + "mailbox full (capacity " + options.mailboxCapacity() + ")"); + } + queue.addLast(envelope); + } + } + } + + // ---- Receive-side -------------------------------------------------- + + private void processLoop() { + workerThread = Thread.currentThread(); + while (true) { + try { + Envelope<T> envelope = queue.takeFirst(); + // POISON is a wake-up marker, not a terminator. A message + // can race past the !active check in send() and land in + // the queue *after* POISON; treating POISON as an + // unconditional return would orphan that message (and its + // sendAndGet reply). The drain-on-stop guarantee is owned + // entirely by the post-processing !active check below. + if (envelope.message != POISON) { + // Free the send-side slot as soon as the message + // leaves the queue, so the bound applies to "queued" + // not "in flight". permitReleased guards against a + // double release when a stashed envelope is later + // unstashed and re-taken (it never re-acquired a + // permit on the way back into the queue). + if (capacity != null && !envelope.permitReleased) { + capacity.release(); + envelope.permitReleased = true; } + dispatch(envelope); + } + // Terminate once stop() has been requested and the queue + // has fully drained — works even when stop() could not + // enqueue the poison pill (bounded mailbox full at the + // time of a self-stop) and ensures race-late sends are + // honoured before exit. Any messages still in the stash + // are rejected here so sendAndGet callers do not hang. + if (!active && queue.isEmpty()) { + drainStashOnExit(); + terminated = true; + return; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); + drainStashOnExit(); + terminated = true; return; } } } + @SuppressWarnings("unchecked") + private void dispatch(Envelope<T> envelope) { + T msg = (T) envelope.message; + currentEnvelope = envelope; + currentStashed = false; + boolean publishSelf = options.currentSelfEnabled(); + if (publishSelf) CURRENT.set(this); + + Object result = null; + Throwable thrown = null; + try { + try { + result = processor.process(context, msg); + } catch (Throwable t) { + thrown = t; + // If the main handler stashed and then threw, undo the + // stash so the message is reported as failed rather than + // silently deferred. An onError handler can still re-stash + // if it explicitly chooses to retry. + if (currentStashed) { + if (!stashBuffer.isEmpty() && stashBuffer.peekLast() == envelope) { + stashBuffer.removeLast(); + } + currentStashed = false; + } + } + + // Fire onError if registered; treat it as best-effort — + // exceptions from the error handler are swallowed so they + // cannot crash the processing loop. The handler may call + // ctx.become(...) / ctx.stash() / ctx.unstashAll() and those + // calls take effect normally. + if (thrown != null) { + TriConsumer<ActorContext<T>, Throwable, ? super T> handler = errorHandler; + if (handler != null) { + try { + handler.accept(context, thrown, msg); + } catch (Throwable ignored) { + } + } + } + + // Finalise based on the post-onError stash state. Three + // outcomes: + // stashed → discard pending state; reply + // stays unbound until the message + // is later unstashed and processed + // threw & not stashed → discard pending state; bind + // error on reply + // success → commit pending state; bind reply + // The send-side permit was already released by the worker + // loop when it dequeued this envelope, so capacity does not + // need to be released here in any outcome. + if (currentStashed) { + processor.rollback(); + } else if (thrown != null) { + processor.rollback(); + if (envelope.reply != null) envelope.reply.bindError(thrown); + } else { + processor.commit(); + if (envelope.reply != null) envelope.reply.bind(result); + } + } finally { + currentEnvelope = null; + currentStashed = false; + if (publishSelf) CURRENT.remove(); + } + } + + /** + * Rejects any messages still in the stash on shutdown so that + * {@code sendAndGet} callers do not wait on replies that will never + * arrive. Stashed envelopes hold no send-side permit (it was + * released when the worker first dequeued them), so capacity does + * not need to be released here. + */ + private void drainStashOnExit() { + if (stashBuffer.isEmpty()) return; + for (Envelope<T> stashed : stashBuffer) { + if (stashed.reply != null) { + stashed.reply.bindError(new IllegalStateException( + "actor stopped with message in stash")); + } + } + stashBuffer.clear(); + } + @Override public String toString() { - return "Actor[active=" + active + ", queued=" + queue.size() + "]"; + return "Actor[active=" + active + + ", queued=" + queue.size() + + ", stashed=" + stashBuffer.size() + "]"; } - // ---- Internal types ------------------------------------------------- + // ---- Helpers shared with the per-shape context implementations ----- - private record Envelope<T>(Object message, DataflowVariable<Object> reply) {} + private void checkOnWorkerThread(String op) { + if (Thread.currentThread() != workerThread) { + throw new IllegalStateException( + op + " must be called from a handler invocation on the actor's worker thread"); + } + } + + private void stashCurrent() { + checkOnWorkerThread("stash()"); + if (currentEnvelope == null) { + throw new IllegalStateException("stash() called outside handler dispatch"); + } + if (currentStashed) return; // idempotent within a single dispatch + + if (options.isStashBounded() && stashBuffer.size() >= options.stashCapacity()) { + switch (options.stashOverflow()) { + case FAIL -> + // Surface the overflow to the handler. If uncaught it + // propagates out and dispatch reports the message as + // failed; if caught the handler can choose to drop / + // process / reroute the current message. + throw new IllegalStateException( + "stash full (capacity " + options.stashCapacity() + ")"); + case DROP_OLDEST -> { + Envelope<T> evicted = stashBuffer.removeFirst(); + if (evicted.reply != null) { + evicted.reply.bindError(new IllegalStateException( + "evicted from stash (capacity " + + options.stashCapacity() + " exceeded)")); + } + // Fall through to add current envelope below. + } + case REJECT -> { + // Bind the current message's reply immediately so the + // caller learns of the rejection now. Mark stashed so + // dispatch finalise discards any pending state change + // and does not try to bind the reply again, but DON'T + // add to the buffer (the message is gone, not deferred). + if (currentEnvelope.reply != null) { + currentEnvelope.reply.bindError(new IllegalStateException( + "stash full (capacity " + options.stashCapacity() + + "); message rejected")); + } + currentStashed = true; + return; + } + } + } + currentStashed = true; + stashBuffer.addLast(currentEnvelope); + } + + private void unstashAllInternal() { + checkOnWorkerThread("unstashAll()"); + if (stashBuffer.isEmpty()) return; + // Re-inject at head in original (FIFO) order: iterate the buffer + // from tail to head and addFirst each, so the oldest stashed + // message ends up at the front of the queue. + Iterator<Envelope<T>> it = stashBuffer.descendingIterator(); + while (it.hasNext()) { + queue.addFirst(it.next()); + } + stashBuffer.clear(); + } + private Cancellable scheduleOnceInternal(T message, Duration delay) { + checkOnWorkerThread("scheduleOnce()"); + Objects.requireNonNull(message, "message must not be null"); + Objects.requireNonNull(delay, "delay must not be null"); + // Chicken-and-egg: the task wants to deregister itself from + // pendingTimers after firing, but the ScheduledFuture doesn't + // exist until schedule(...) returns. Stash it via AtomicReference + // and read it inside the task. + AtomicReference<ScheduledFuture<?>> ref = new AtomicReference<>(); + Runnable task = () -> { + // Hand the send off to the async executor so the scheduler + // thread never blocks on a full BLOCK mailbox. The scheduler + // is a shared resource (one per JVM); blocking it on one + // actor's bound would starve every other actor's timers. + AsyncSupport.getExecutor().execute(() -> { + try { + send(message); + } catch (IllegalStateException ignored) { + // Actor was stopped between schedule and fire — drop. + } + }); + ScheduledFuture<?> f = ref.get(); + if (f != null) pendingTimers.remove(f); + }; + ScheduledFuture<?> future = AsyncSupport.getScheduler() + .schedule(task, delay.toNanos(), TimeUnit.NANOSECONDS); + // Race window: with a near-zero delay, the task can fire and + // reach `ref.get()` before this set() runs, observing null and + // skipping its self-deregistration. The future is then added to + // pendingTimers below and stays there until stop() cancels it + // (a no-op on an already-fired future). Functionally benign — + // just a dangling entry until shutdown — so we don't pay for a + // compareAndSet here. Documented for future readers. + ref.set(future); + pendingTimers.add(future); + return new TimerCancellable(future, pendingTimers); + } + + private Cancellable scheduleAtFixedRateInternal(T message, Duration initialDelay, Duration interval) { + checkOnWorkerThread("scheduleAtFixedRate()"); + Objects.requireNonNull(message, "message must not be null"); + Objects.requireNonNull(initialDelay, "initialDelay must not be null"); + Objects.requireNonNull(interval, "interval must not be null"); + // Periodic timers never self-deregister — they fire repeatedly + // until cancelled explicitly or via stop(). Each fire offloads + // the send to the async executor so a BLOCK-bounded actor can + // never starve the shared scheduler thread. + Runnable task = () -> AsyncSupport.getExecutor().execute(() -> { + try { + send(message); + } catch (IllegalStateException ignored) { + // Actor stopped — drop. + } + }); + ScheduledFuture<?> future = AsyncSupport.getScheduler() + .scheduleAtFixedRate(task, + initialDelay.toNanos(), + interval.toNanos(), + TimeUnit.NANOSECONDS); + pendingTimers.add(future); + return new TimerCancellable(future, pendingTimers); Review Comment: `scheduleAtFixedRateInternal` assumes `stop()` will cancel all outstanding timers, but it is still possible to enter the draining phase (`active=false`) and then dispatch a queued message whose handler schedules a new fixed-rate timer. That timer won’t be cancelled (since `stop()` already ran) and will continue firing forever, repeatedly submitting failed `send()` tasks. Consider rejecting scheduling when `!active` (or auto-cancelling any newly created timer immediately when stopped) to avoid periodic timer leaks during shutdown. ########## src/main/java/org/apache/groovy/runtime/async/DefaultActor.java: ########## @@ -89,79 +190,610 @@ public boolean isActive() { } @Override - @SuppressWarnings("unchecked") + public boolean isTerminated() { + return terminated; + } + + @Override public void stop() { if (!active) return; active = false; - // Poison pill signals the processing loop to exit after draining - queue.add(new Envelope<>(POISON, null)); + // Cancel any outstanding scheduled timers so they do not fire + // into a stopped actor (one-shots would get IllegalStateException + // from send and be silently caught, but periodic timers would + // keep firing forever holding refs to this actor). + for (ScheduledFuture<?> f : pendingTimers) { + f.cancel(false); + } + pendingTimers.clear(); + // Best-effort wake-up. The deque is internally unbounded + // (capacity is gated by the semaphore), so offerLast always + // succeeds — but the POISON itself is just a marker; the + // post-message drain check below is what actually terminates + // the loop. + queue.offerLast(new Envelope<>(POISON, null)); } - // ---- Internal ------------------------------------------------------- + @Override + public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg); + return this; + } - @SuppressWarnings("unchecked") - private void processLoop() { - while (true) { - try { - Envelope<T> envelope = queue.take(); - if (envelope.message == POISON) return; + @Override + public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T> handler) { + Objects.requireNonNull(handler, "handler must not be null"); + this.errorHandler = handler; + return this; + } - try { - Object result = processor.process((T) envelope.message); - if (envelope.reply != null) { - envelope.reply.bind(result); + // ---- Send-side ----------------------------------------------------- + + /** + * Routes an envelope through the configured mailbox policy. A permit + * is acquired here at {@code send} time and released by the worker + * the moment it dequeues the envelope, so the bound applies to the + * size of the queue rather than to in-flight or stashed messages. + */ + private void enqueue(Envelope<T> envelope) { + if (capacity == null) { + queue.addLast(envelope); + return; + } + switch (options.overflow()) { + case BLOCK -> { + // A handler that sends to its own actor on a full BLOCK + // mailbox would deadlock: it is the only consumer, and + // acquire() would park it indefinitely waiting for + // capacity it can never free. The worker-thread branch + // uses tryAcquire so an external sender can't steal the + // last permit between an availablePermits() check and + // the acquire() (a multi-sender TOCTOU that would + // otherwise re-introduce the same deadlock). + if (Thread.currentThread() == workerThread) { + if (!capacity.tryAcquire()) { + throw new IllegalStateException( + "send from the actor's own handler would deadlock " + + "the bounded BLOCK mailbox (capacity " + + options.mailboxCapacity() + ")"); + } + } else { + try { + capacity.acquire(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sending to actor", ie); } - } catch (Throwable t) { + } + queue.addLast(envelope); + } + case DROP_NEWEST -> { + if (!capacity.tryAcquire()) { if (envelope.reply != null) { - envelope.reply.bindError(t); + envelope.reply.bindError(new IllegalStateException( + "mailbox full (capacity " + options.mailboxCapacity() + + "); message dropped")); + } + return; + } + queue.addLast(envelope); + } + case FAIL -> { + if (!capacity.tryAcquire()) { + throw new IllegalStateException( + "mailbox full (capacity " + options.mailboxCapacity() + ")"); + } + queue.addLast(envelope); + } + } + } + + // ---- Receive-side -------------------------------------------------- + + private void processLoop() { + workerThread = Thread.currentThread(); + while (true) { + try { + Envelope<T> envelope = queue.takeFirst(); + // POISON is a wake-up marker, not a terminator. A message + // can race past the !active check in send() and land in + // the queue *after* POISON; treating POISON as an + // unconditional return would orphan that message (and its + // sendAndGet reply). The drain-on-stop guarantee is owned + // entirely by the post-processing !active check below. + if (envelope.message != POISON) { + // Free the send-side slot as soon as the message + // leaves the queue, so the bound applies to "queued" + // not "in flight". permitReleased guards against a + // double release when a stashed envelope is later + // unstashed and re-taken (it never re-acquired a + // permit on the way back into the queue). + if (capacity != null && !envelope.permitReleased) { + capacity.release(); + envelope.permitReleased = true; } + dispatch(envelope); + } + // Terminate once stop() has been requested and the queue + // has fully drained — works even when stop() could not + // enqueue the poison pill (bounded mailbox full at the + // time of a self-stop) and ensures race-late sends are + // honoured before exit. Any messages still in the stash + // are rejected here so sendAndGet callers do not hang. + if (!active && queue.isEmpty()) { + drainStashOnExit(); + terminated = true; + return; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); + drainStashOnExit(); + terminated = true; return; } } } + @SuppressWarnings("unchecked") + private void dispatch(Envelope<T> envelope) { + T msg = (T) envelope.message; + currentEnvelope = envelope; + currentStashed = false; + boolean publishSelf = options.currentSelfEnabled(); + if (publishSelf) CURRENT.set(this); + + Object result = null; + Throwable thrown = null; + try { + try { + result = processor.process(context, msg); + } catch (Throwable t) { + thrown = t; + // If the main handler stashed and then threw, undo the + // stash so the message is reported as failed rather than + // silently deferred. An onError handler can still re-stash + // if it explicitly chooses to retry. + if (currentStashed) { + if (!stashBuffer.isEmpty() && stashBuffer.peekLast() == envelope) { + stashBuffer.removeLast(); + } + currentStashed = false; + } + } + + // Fire onError if registered; treat it as best-effort — + // exceptions from the error handler are swallowed so they + // cannot crash the processing loop. The handler may call + // ctx.become(...) / ctx.stash() / ctx.unstashAll() and those + // calls take effect normally. + if (thrown != null) { + TriConsumer<ActorContext<T>, Throwable, ? super T> handler = errorHandler; + if (handler != null) { + try { + handler.accept(context, thrown, msg); + } catch (Throwable ignored) { + } + } + } + + // Finalise based on the post-onError stash state. Three + // outcomes: + // stashed → discard pending state; reply + // stays unbound until the message + // is later unstashed and processed + // threw & not stashed → discard pending state; bind + // error on reply + // success → commit pending state; bind reply + // The send-side permit was already released by the worker + // loop when it dequeued this envelope, so capacity does not + // need to be released here in any outcome. + if (currentStashed) { + processor.rollback(); + } else if (thrown != null) { + processor.rollback(); + if (envelope.reply != null) envelope.reply.bindError(thrown); + } else { + processor.commit(); + if (envelope.reply != null) envelope.reply.bind(result); + } + } finally { + currentEnvelope = null; + currentStashed = false; + if (publishSelf) CURRENT.remove(); + } + } + + /** + * Rejects any messages still in the stash on shutdown so that + * {@code sendAndGet} callers do not wait on replies that will never + * arrive. Stashed envelopes hold no send-side permit (it was + * released when the worker first dequeued them), so capacity does + * not need to be released here. + */ + private void drainStashOnExit() { + if (stashBuffer.isEmpty()) return; + for (Envelope<T> stashed : stashBuffer) { + if (stashed.reply != null) { + stashed.reply.bindError(new IllegalStateException( + "actor stopped with message in stash")); + } + } + stashBuffer.clear(); + } + @Override public String toString() { - return "Actor[active=" + active + ", queued=" + queue.size() + "]"; + return "Actor[active=" + active + + ", queued=" + queue.size() + + ", stashed=" + stashBuffer.size() + "]"; } - // ---- Internal types ------------------------------------------------- + // ---- Helpers shared with the per-shape context implementations ----- - private record Envelope<T>(Object message, DataflowVariable<Object> reply) {} + private void checkOnWorkerThread(String op) { + if (Thread.currentThread() != workerThread) { + throw new IllegalStateException( + op + " must be called from a handler invocation on the actor's worker thread"); + } + } + + private void stashCurrent() { + checkOnWorkerThread("stash()"); + if (currentEnvelope == null) { + throw new IllegalStateException("stash() called outside handler dispatch"); + } + if (currentStashed) return; // idempotent within a single dispatch + + if (options.isStashBounded() && stashBuffer.size() >= options.stashCapacity()) { + switch (options.stashOverflow()) { + case FAIL -> + // Surface the overflow to the handler. If uncaught it + // propagates out and dispatch reports the message as + // failed; if caught the handler can choose to drop / + // process / reroute the current message. + throw new IllegalStateException( + "stash full (capacity " + options.stashCapacity() + ")"); + case DROP_OLDEST -> { + Envelope<T> evicted = stashBuffer.removeFirst(); + if (evicted.reply != null) { + evicted.reply.bindError(new IllegalStateException( + "evicted from stash (capacity " + + options.stashCapacity() + " exceeded)")); + } + // Fall through to add current envelope below. + } + case REJECT -> { + // Bind the current message's reply immediately so the + // caller learns of the rejection now. Mark stashed so + // dispatch finalise discards any pending state change + // and does not try to bind the reply again, but DON'T + // add to the buffer (the message is gone, not deferred). + if (currentEnvelope.reply != null) { + currentEnvelope.reply.bindError(new IllegalStateException( + "stash full (capacity " + options.stashCapacity() + + "); message rejected")); + } + currentStashed = true; + return; + } + } + } + currentStashed = true; + stashBuffer.addLast(currentEnvelope); + } + + private void unstashAllInternal() { + checkOnWorkerThread("unstashAll()"); + if (stashBuffer.isEmpty()) return; + // Re-inject at head in original (FIFO) order: iterate the buffer + // from tail to head and addFirst each, so the oldest stashed + // message ends up at the front of the queue. + Iterator<Envelope<T>> it = stashBuffer.descendingIterator(); + while (it.hasNext()) { + queue.addFirst(it.next()); + } + stashBuffer.clear(); + } + private Cancellable scheduleOnceInternal(T message, Duration delay) { + checkOnWorkerThread("scheduleOnce()"); + Objects.requireNonNull(message, "message must not be null"); + Objects.requireNonNull(delay, "delay must not be null"); + // Chicken-and-egg: the task wants to deregister itself from + // pendingTimers after firing, but the ScheduledFuture doesn't + // exist until schedule(...) returns. Stash it via AtomicReference + // and read it inside the task. + AtomicReference<ScheduledFuture<?>> ref = new AtomicReference<>(); + Runnable task = () -> { + // Hand the send off to the async executor so the scheduler + // thread never blocks on a full BLOCK mailbox. The scheduler + // is a shared resource (one per JVM); blocking it on one + // actor's bound would starve every other actor's timers. + AsyncSupport.getExecutor().execute(() -> { + try { + send(message); + } catch (IllegalStateException ignored) { + // Actor was stopped between schedule and fire — drop. + } + }); + ScheduledFuture<?> f = ref.get(); + if (f != null) pendingTimers.remove(f); + }; + ScheduledFuture<?> future = AsyncSupport.getScheduler() + .schedule(task, delay.toNanos(), TimeUnit.NANOSECONDS); + // Race window: with a near-zero delay, the task can fire and + // reach `ref.get()` before this set() runs, observing null and + // skipping its self-deregistration. The future is then added to + // pendingTimers below and stays there until stop() cancels it + // (a no-op on an already-fired future). Functionally benign — + // just a dangling entry until shutdown — so we don't pay for a + // compareAndSet here. Documented for future readers. + ref.set(future); + pendingTimers.add(future); + return new TimerCancellable(future, pendingTimers); Review Comment: `scheduleOnceInternal` documents a race where the task can fire before `ref.set(future)`, causing the one-shot future to remain in `pendingTimers` until `stop()`. If an actor schedules many near-zero-delay timers during its lifetime, this can accumulate stale entries and prevent timely cleanup. Consider adding a small fired-flag handshake (e.g., task marks itself fired and the caller removes the future if it already fired) so one-shots always deregister even when they run immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
