[ 
https://issues.apache.org/jira/browse/GROOVY-12033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul King updated GROOVY-12033:
-------------------------------
    Description: 
h2. Summary

Add four production-readiness improvements to {{groovy.concurrent.Actor}} 
introduced in alpha-1:

# {{Actor.Stop}} return-value sentinel for in-handler self-termination
# {{onError}} callback so handler exceptions are no longer silently swallowed 
for fire-and-forget {{send}}
# Bounded mailbox with overflow strategy ({{BLOCK}} / {{DROP_NEWEST}} / 
{{FAIL}})
# Per-actor executor selection

All changes are additive and backward compatible. Existing factory methods and 
the existing {{ActorTest}} pass unchanged.

h2. Public API additions

h3. {{groovy.concurrent.Actor}}

* {{Object Stop}} — public sentinel constant. A handler returning 
{{Actor.Stop}} causes the actor to stop gracefully after the current message 
(semantics of {{stop()}}: queued messages still drain). For {{sendAndGet}} 
callers the reply is bound with the {{Stop}} sentinel itself; detect by 
reference equality.
* {{default Actor<T> onError(BiFunction<Throwable, ? super T, ?> handler)}} — 
registers a handler invoked when message processing throws. Receives 
{{(throwable, message)}}. Return value is a control signal: {{Actor.Stop}} 
stops the actor, anything else continues. Default implementation throws 
{{UnsupportedOperationException}}; {{DefaultActor}} overrides.
* {{static Actor<T> reactor(Function<T,R> handler, ActorOptions options)}} — 
new overload.
* {{static Actor<T> stateful(S initial, BiFunction<S,T,S> handler, ActorOptions 
options)}} — new overload.

h3. New: {{groovy.concurrent.ActorOptions}} (record)

{code:java}
public record ActorOptions(int mailboxCapacity, Overflow overflow, Executor 
executor) {
    public enum Overflow { BLOCK, DROP_NEWEST, FAIL }
    public static final ActorOptions DEFAULTS;
    public ActorOptions withBoundedMailbox(int capacity, Overflow strategy);
    public ActorOptions withExecutor(Executor executor);
    public boolean isBounded();
}
{code}

* {{mailboxCapacity == 0}} → unbounded (current behaviour).
* {{executor == null}} → uses {{AsyncSupport.getExecutor()}} (current 
behaviour).
* Canonical constructor rejects negative capacity and null overflow.

h2. Behavioural changes in {{org.apache.groovy.runtime.async.DefaultActor}}

* Constructor now takes {{ActorOptions}}; queue is 
{{LinkedBlockingQueue(capacity)}} when bounded.
* {{send(T)}} routes through an {{enqueue(...)}} helper that honours the 
overflow policy:
** {{BLOCK}} → {{queue.put}} (back-pressures the sending thread; on interrupt 
restores the flag and throws a wrapping {{RuntimeException}}).
** {{DROP_NEWEST}} → {{queue.offer}}; on overflow the message is silently 
dropped, and {{sendAndGet}} replies bind an {{IllegalStateException}} so 
awaiters don't hang.
** {{FAIL}} → {{queue.offer}}; on overflow throws {{IllegalStateException}}. 
{{sendAndGet}} additionally binds the reply with the same exception before 
rethrowing.
* {{processLoop}} checks {{result == Actor.Stop}} after the handler runs and 
calls {{stop()}} (graceful — pending messages drain).
* {{processLoop}}'s catch block now invokes the registered {{onError}} handler 
if present, in addition to binding the failure on the reply for {{sendAndGet}}. 
If the {{onError}} handler returns {{Actor.Stop}} the actor stops. Exceptions 
from the {{onError}} handler itself are caught and discarded so the processing 
loop cannot be destabilised.
* {{StatefulProcessor.process}} preserves the prior state when the handler 
returns {{Actor.Stop}}, so any messages queued behind the Stop-trigger still 
observe real state during drain.
* {{stop()}} now offers the poison pill on a possibly-bounded queue, falling 
back to {{put}} so termination always succeeds.

h2. Files touched

|| File || Change ||
| {{src/main/java/groovy/concurrent/Actor.java}} | Modified — Stop sentinel, 
onError default, 2 factory overloads, doc updates on {{send}} |
| {{src/main/java/groovy/concurrent/ActorOptions.java}} | New |
| {{src/main/java/org/apache/groovy/runtime/async/DefaultActor.java}} | 
Modified — see behavioural changes above |
| {{src/test/groovy/groovy/concurrent/ActorTest.groovy}} | Extended — 13 new 
tests (see below) |

h2. Tests

13 new tests, all passing alongside the existing 13:

* {{testStatefulSelfStopsOnStopSentinel}}, 
{{testReactorSelfStopsOnStopSentinel}}
* {{testStopSentinelDrainsQueuedMessages}} — confirms FIFO drain after Stop
* {{testStatePreservedAcrossStopSentinel}} — confirms StatefulProcessor doesn't 
overwrite state with Stop
* {{testOnErrorFiresForFireAndForgetException}}
* {{testOnErrorAlsoFiresForSendAndGet}} — confirms both the {{Awaitable}} 
failure path and the {{onError}} callback fire
* {{testOnErrorReturningStopTerminatesActor}}
* {{testOnErrorHandlerExceptionIsSwallowed}} — confirms a throwing {{onError}} 
doesn't break the loop
* {{testBoundedMailboxFailOverflowThrows}}
* {{testBoundedMailboxDropNewest}}
* {{testBoundedMailboxDropNewestReplyBindsError}} — overflow on {{sendAndGet}} 
surfaces via the {{Awaitable}}
* {{testBoundedMailboxBlockBackpressures}} — confirms the sender thread 
actually parks until a slot frees
* {{testPerActorExecutorIsUsed}} — confirms the handler runs on the supplied 
executor's thread
* {{testActorOptionsRejectsNegativeCapacity}}, 
{{testActorOptionsWithBoundedMailboxRejectsZero}}

Full suite of 135 tests across the 18 {{groovy.concurrent.*}} test classes 
still passes.

h2. Out of scope / deferred

Deferred to a future ticket (proposed for Groovy 7):

* Idle / receive timeout ({{react(timeout)}} from GPars, {{setReceiveTimeout}} 
from Pekko)
* Restart-with-backoff supervision policy
* Per-actor {{withTimers}} scheduling hub
* Stash / unstash
* {{become}}-style behaviour swap

Not adopted (out of scope for {{groovy.concurrent}} entirely): actor hierarchy 
with supervision trees, remote actors, persistence, sharding. These are Pekko's 
domain; {{groovy.concurrent.Actor}} is for serialising in-process mutable state.

h2. Compatibility

* Source compatible: no existing signature on the {{Actor}} interface or 
{{DefaultActor}} factory methods changed.
* Binary compatible for all existing call sites; existing 2-arg factories route 
through {{ActorOptions.DEFAULTS}}.
* {{onError}} is a {{default}} method, so existing implementors of {{Actor}} 
compile unchanged (calls to {{onError}} on a non-{{DefaultActor}} 
implementation throw {{UnsupportedOperationException}}).

h2. Notes for follow-up

While adding tests, two Groovy 6 {{await}} parser-sugar quirks surfaced and are 
worth filing separately (not part of this change):

# {{await(x).is(y)}} parses as {{await(x.is(y))}} — the postfix chain after 
{{await(...)}} is consumed into its argument list.
# {{await(x)}} as a standalone statement is rejected with "Modifiers or return 
type is required" — {{await}} requires expression context.

Both worked around in the tests by binding {{def v = await(...)}} first.

> groovy.concurrent.Actor: pre-GA hardening — Stop sentinel, error callback, 
> bounded mailbox, per-actor pool
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: GROOVY-12033
>                 URL: https://issues.apache.org/jira/browse/GROOVY-12033
>             Project: Groovy
>          Issue Type: Improvement
>            Reporter: Paul King
>            Priority: Major
>
> h2. Summary
> Add four production-readiness improvements to {{groovy.concurrent.Actor}} 
> introduced in alpha-1:
> # {{Actor.Stop}} return-value sentinel for in-handler self-termination
> # {{onError}} callback so handler exceptions are no longer silently swallowed 
> for fire-and-forget {{send}}
> # Bounded mailbox with overflow strategy ({{BLOCK}} / {{DROP_NEWEST}} / 
> {{FAIL}})
> # Per-actor executor selection
> All changes are additive and backward compatible. Existing factory methods 
> and the existing {{ActorTest}} pass unchanged.
> h2. Public API additions
> h3. {{groovy.concurrent.Actor}}
> * {{Object Stop}} — public sentinel constant. A handler returning 
> {{Actor.Stop}} causes the actor to stop gracefully after the current message 
> (semantics of {{stop()}}: queued messages still drain). For {{sendAndGet}} 
> callers the reply is bound with the {{Stop}} sentinel itself; detect by 
> reference equality.
> * {{default Actor<T> onError(BiFunction<Throwable, ? super T, ?> handler)}} — 
> registers a handler invoked when message processing throws. Receives 
> {{(throwable, message)}}. Return value is a control signal: {{Actor.Stop}} 
> stops the actor, anything else continues. Default implementation throws 
> {{UnsupportedOperationException}}; {{DefaultActor}} overrides.
> * {{static Actor<T> reactor(Function<T,R> handler, ActorOptions options)}} — 
> new overload.
> * {{static Actor<T> stateful(S initial, BiFunction<S,T,S> handler, 
> ActorOptions options)}} — new overload.
> h3. New: {{groovy.concurrent.ActorOptions}} (record)
> {code:java}
> public record ActorOptions(int mailboxCapacity, Overflow overflow, Executor 
> executor) {
>     public enum Overflow { BLOCK, DROP_NEWEST, FAIL }
>     public static final ActorOptions DEFAULTS;
>     public ActorOptions withBoundedMailbox(int capacity, Overflow strategy);
>     public ActorOptions withExecutor(Executor executor);
>     public boolean isBounded();
> }
> {code}
> * {{mailboxCapacity == 0}} → unbounded (current behaviour).
> * {{executor == null}} → uses {{AsyncSupport.getExecutor()}} (current 
> behaviour).
> * Canonical constructor rejects negative capacity and null overflow.
> h2. Behavioural changes in {{org.apache.groovy.runtime.async.DefaultActor}}
> * Constructor now takes {{ActorOptions}}; queue is 
> {{LinkedBlockingQueue(capacity)}} when bounded.
> * {{send(T)}} routes through an {{enqueue(...)}} helper that honours the 
> overflow policy:
> ** {{BLOCK}} → {{queue.put}} (back-pressures the sending thread; on interrupt 
> restores the flag and throws a wrapping {{RuntimeException}}).
> ** {{DROP_NEWEST}} → {{queue.offer}}; on overflow the message is silently 
> dropped, and {{sendAndGet}} replies bind an {{IllegalStateException}} so 
> awaiters don't hang.
> ** {{FAIL}} → {{queue.offer}}; on overflow throws {{IllegalStateException}}. 
> {{sendAndGet}} additionally binds the reply with the same exception before 
> rethrowing.
> * {{processLoop}} checks {{result == Actor.Stop}} after the handler runs and 
> calls {{stop()}} (graceful — pending messages drain).
> * {{processLoop}}'s catch block now invokes the registered {{onError}} 
> handler if present, in addition to binding the failure on the reply for 
> {{sendAndGet}}. If the {{onError}} handler returns {{Actor.Stop}} the actor 
> stops. Exceptions from the {{onError}} handler itself are caught and 
> discarded so the processing loop cannot be destabilised.
> * {{StatefulProcessor.process}} preserves the prior state when the handler 
> returns {{Actor.Stop}}, so any messages queued behind the Stop-trigger still 
> observe real state during drain.
> * {{stop()}} now offers the poison pill on a possibly-bounded queue, falling 
> back to {{put}} so termination always succeeds.
> h2. Files touched
> || File || Change ||
> | {{src/main/java/groovy/concurrent/Actor.java}} | Modified — Stop sentinel, 
> onError default, 2 factory overloads, doc updates on {{send}} |
> | {{src/main/java/groovy/concurrent/ActorOptions.java}} | New |
> | {{src/main/java/org/apache/groovy/runtime/async/DefaultActor.java}} | 
> Modified — see behavioural changes above |
> | {{src/test/groovy/groovy/concurrent/ActorTest.groovy}} | Extended — 13 new 
> tests (see below) |
> h2. Tests
> 13 new tests, all passing alongside the existing 13:
> * {{testStatefulSelfStopsOnStopSentinel}}, 
> {{testReactorSelfStopsOnStopSentinel}}
> * {{testStopSentinelDrainsQueuedMessages}} — confirms FIFO drain after Stop
> * {{testStatePreservedAcrossStopSentinel}} — confirms StatefulProcessor 
> doesn't overwrite state with Stop
> * {{testOnErrorFiresForFireAndForgetException}}
> * {{testOnErrorAlsoFiresForSendAndGet}} — confirms both the {{Awaitable}} 
> failure path and the {{onError}} callback fire
> * {{testOnErrorReturningStopTerminatesActor}}
> * {{testOnErrorHandlerExceptionIsSwallowed}} — confirms a throwing 
> {{onError}} doesn't break the loop
> * {{testBoundedMailboxFailOverflowThrows}}
> * {{testBoundedMailboxDropNewest}}
> * {{testBoundedMailboxDropNewestReplyBindsError}} — overflow on 
> {{sendAndGet}} surfaces via the {{Awaitable}}
> * {{testBoundedMailboxBlockBackpressures}} — confirms the sender thread 
> actually parks until a slot frees
> * {{testPerActorExecutorIsUsed}} — confirms the handler runs on the supplied 
> executor's thread
> * {{testActorOptionsRejectsNegativeCapacity}}, 
> {{testActorOptionsWithBoundedMailboxRejectsZero}}
> Full suite of 135 tests across the 18 {{groovy.concurrent.*}} test classes 
> still passes.
> h2. Out of scope / deferred
> Deferred to a future ticket (proposed for Groovy 7):
> * Idle / receive timeout ({{react(timeout)}} from GPars, 
> {{setReceiveTimeout}} from Pekko)
> * Restart-with-backoff supervision policy
> * Per-actor {{withTimers}} scheduling hub
> * Stash / unstash
> * {{become}}-style behaviour swap
> Not adopted (out of scope for {{groovy.concurrent}} entirely): actor 
> hierarchy with supervision trees, remote actors, persistence, sharding. These 
> are Pekko's domain; {{groovy.concurrent.Actor}} is for serialising in-process 
> mutable state.
> h2. Compatibility
> * Source compatible: no existing signature on the {{Actor}} interface or 
> {{DefaultActor}} factory methods changed.
> * Binary compatible for all existing call sites; existing 2-arg factories 
> route through {{ActorOptions.DEFAULTS}}.
> * {{onError}} is a {{default}} method, so existing implementors of {{Actor}} 
> compile unchanged (calls to {{onError}} on a non-{{DefaultActor}} 
> implementation throw {{UnsupportedOperationException}}).
> h2. Notes for follow-up
> While adding tests, two Groovy 6 {{await}} parser-sugar quirks surfaced and 
> are worth filing separately (not part of this change):
> # {{await(x).is(y)}} parses as {{await(x.is(y))}} — the postfix chain after 
> {{await(...)}} is consumed into its argument list.
> # {{await(x)}} as a standalone statement is rejected with "Modifiers or 
> return type is required" — {{await}} requires expression context.
> Both worked around in the tests by binding {{def v = await(...)}} first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to