Copilot commented on code in PR #2555:
URL: https://github.com/apache/groovy/pull/2555#discussion_r3292122141


##########
src/main/java/groovy/concurrent/Actor.java:
##########
@@ -150,6 +237,22 @@ static <T, R> Actor<T> reactor(Function<T, R> handler) {
      * @return a started actor
      */
     static <T, S> Actor<T> stateful(S initialState, BiFunction<S, T, S> 
handler) {
-        return DefaultActor.stateful(initialState, handler);
+        return DefaultActor.stateful(initialState, handler, 
ActorOptions.DEFAULTS);

Review Comment:
   `stateful` handlers are declared as `BiFunction<S, T, S>`, but the new 
`Actor.Stop` sentinel is documented as a valid return to stop an actor. This is 
not representable for Java callers (they can't return `Actor.Stop` where `S` is 
e.g. `Integer`), making the sentinel effectively Groovy-only for stateful 
actors. Consider widening the handler return type (e.g., `BiFunction<S, T, ?>` 
/ `Object`) or providing an additional overload explicitly supporting 
`Actor.Stop`.



##########
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java:
##########
@@ -89,32 +110,100 @@ public boolean isActive() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void stop() {
         if (!active) return;
         active = false;
-        // Poison pill signals the processing loop to exit after draining
-        queue.add(new Envelope<>(POISON, null));
+        // Poison pill signals the processing loop to exit after draining.
+        // Use offer on a possibly-bounded queue — falling back to put if
+        // the queue is full, since the poison must always be delivered.
+        if (!queue.offer(new Envelope<>(POISON, null))) {
+            try {
+                queue.put(new Envelope<>(POISON, null));
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }

Review Comment:
   `stop()` can deadlock when called from the actor's own processing thread 
with a bounded mailbox: between a `take()` and the subsequent `stop()` call, 
another sender can fill the queue so `offer()` fails and `queue.put(...)` 
blocks, but the actor thread is the only consumer so space never frees. Also, 
if the stopping thread is interrupted, the poison pill may never be enqueued, 
leaving the processing loop blocked forever once it drains. Consider switching 
to a non-blocking stop signal (e.g., exit when `!active && queue.isEmpty()`), 
or ensure poison insertion cannot block and is retried/guaranteed even under 
interrupt.
   



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,329 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Actor.Stop sentinel (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopsOnStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def max = 3
+            def bot = Actor.stateful(0) { int count, msg ->
+                def next = count + 1
+                next < max ? next : Actor.Stop
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            // Allow processing
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopsOnStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                n.incrementAndGet() == 2 ? Actor.Stop : msg
+            }
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            def stopReply = await(actor.sendAndGet('second'))
+            assert stopReply.is(Actor.Stop)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStopSentinelDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Use ordered sendAndGet calls to keep the queue contents
+            // deterministic relative to the Stop trigger.
+            def actor = Actor.stateful(0) { int seen, Integer msg ->
+                log << msg
+                msg == 2 ? Actor.Stop : seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def actor = Actor.stateful(100) { int state, msg ->
+                msg == 'stop' ? Actor.Stop : state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before Stop drains
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            assert v2.is(Actor.Stop)
+            // Third message saw preserved state (101), not Stop.
+            assert v3 == 102
+        '''
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+                null
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet(); null }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorReturningStopTerminatesActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { Throwable t, msg -> Actor.Stop }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            // give the loop a tick to pull the first message off the queue
+            Thread.sleep(50)

Review Comment:
   These bounded-mailbox tests depend on `Thread.sleep(50)` to assume the actor 
has already started processing the first message. On slower/loaded CI, this 
race can make later `send()` calls overflow earlier than expected (or block 
unexpectedly), producing flaky tests. Prefer a handshake latch/barrier inside 
the handler (e.g., count down when processing starts) so the test only proceeds 
once the first message is definitely being processed.
   



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,329 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Actor.Stop sentinel (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopsOnStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def max = 3
+            def bot = Actor.stateful(0) { int count, msg ->
+                def next = count + 1
+                next < max ? next : Actor.Stop
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            // Allow processing
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopsOnStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                n.incrementAndGet() == 2 ? Actor.Stop : msg
+            }
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            def stopReply = await(actor.sendAndGet('second'))
+            assert stopReply.is(Actor.Stop)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStopSentinelDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Use ordered sendAndGet calls to keep the queue contents
+            // deterministic relative to the Stop trigger.
+            def actor = Actor.stateful(0) { int seen, Integer msg ->
+                log << msg
+                msg == 2 ? Actor.Stop : seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossStopSentinel() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def actor = Actor.stateful(100) { int state, msg ->
+                msg == 'stop' ? Actor.Stop : state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before Stop drains
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            assert v2.is(Actor.Stop)
+            // Third message saw preserved state (101), not Stop.
+            assert v3 == 102
+        '''
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+                null
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet(); null }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorReturningStopTerminatesActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { Throwable t, msg -> Actor.Stop }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            // give the loop a tick to pull the first message off the queue
+            Thread.sleep(50)
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            try {
+                actor.send('c')     // overflow
+                assert false : 'expected IllegalStateException'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('mailbox full')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewest() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def hold = new CountDownLatch(1)
+            def seen = new CopyOnWriteArrayList()
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> hold.await(); seen << msg; msg 
}, options)
+
+            actor.send('first')     // currently being processed
+            Thread.sleep(50)
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            actor.send('c')         // dropped silently
+            actor.send('d')         // dropped silently
+            hold.countDown()
+            // Wait for the three accepted messages
+            for (int i = 0; i < 40 && seen.size() < 3; i++) Thread.sleep(25)
+            assert seen.toList() == ['first', 'a', 'b']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewestReplyBindsError() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(1, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> hold.await(); msg }, options)
+
+            actor.send('busy')         // occupies the handler
+            Thread.sleep(50)
+            actor.send('queued')        // fills the 1-slot queue
+            def dropped = actor.sendAndGet('dropped')  // overflows
+            try {
+                await(dropped)
+                assert false : 'awaiting a dropped sendAndGet should fail'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('dropped')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxBlockBackpressures() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicLong
+
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(1, 
ActorOptions.Overflow.BLOCK)
+            def actor = Actor.reactor({ msg -> hold.await(); msg }, options)
+
+            actor.send('first')      // being processed
+            Thread.sleep(50)
+            actor.send('queued')     // fills the slot
+            // The next send must block until the handler frees a slot.
+            def sendStarted = new AtomicLong()
+            def sendReturned = new AtomicLong()
+            def t = Thread.start {
+                sendStarted.set(System.nanoTime())
+                actor.send('blocked')
+                sendReturned.set(System.nanoTime())
+            }
+            Thread.sleep(150)

Review Comment:
   This backpressure test relies on `Thread.sleep(50)` to ensure the first 
message has been taken off the queue before filling the single-slot mailbox. If 
the actor hasn't started yet, the main thread can block at 
`actor.send('queued')` longer than intended, making the test timing-dependent. 
Use a latch signaled by the handler on entry (or similar synchronization) 
instead of fixed sleeps to make the test deterministic.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to