kunwp1 opened a new pull request, #5562:
URL: https://github.com/apache/texera/pull/5562

   ### What changes were proposed in this PR?
   
   `ControllerRpcProbe` (in `RegionCoordinatorTestSupport`) captures 
controller-to-worker RPCs in a mutable `ArrayBuffer`. That buffer is appended 
to on the Pekko actor/scheduler thread via the output-gateway callback, while 
the test thread iterates it through the read helpers (`methodTrace`, 
`initializedWorkers`, `startedWorkers`, `endWorkerCalls`). Because the buffer 
was unsynchronized, an append racing a `filter`/`map` was caught by Scala 
2.13's `MutationTracker` and thrown as a hard 
`java.util.ConcurrentModificationException`.
   
   This surfaced as a non-deterministic failure of 
`RegionExecutionCoordinatorSpec` — most visibly the *"retry EndWorker failures 
and delay gracefulStop until a retry succeeds"* test, whose 
`waitUntil(endWorkerCalls.size >= 2)` loop polls the buffer while the 200 ms 
`EndWorker` retry appends to it from a scheduler thread.
   
   The fix guards the buffer with a dedicated lock:
   
   - the append (`calls += call`) synchronizes on the lock;
   - the read helpers take an immutable snapshot (`calls.toSeq`) under the same 
lock and filter/map the snapshot, so iteration can never race a concurrent 
append;
   - the lock is released before `fulfill()` runs, so no RPC callback executes 
while the lock is held (no re-entrancy/deadlock risk).
   
   No production code is changed — only the test-support probe and a new test. 
`calls` is now `private`; it was only ever read through these helpers.
   
   ### Any related issues, documentation, discussions?
   
   Resolves #5546
   
   ### How was this PR tested?
   
   Added `ControllerRpcProbeSpec`, a focused concurrency stress test: a writer 
thread drives 20,000 appends through `outputGateway.sendTo` (the same boundary 
the coordinator uses) while a reader thread iterates every helper concurrently, 
asserting no exception is thrown.
   
   - **Before the fix**, the stress test fails deterministically (~38 ms) with 
the exact issue error: `java.util.ConcurrentModificationException: mutation 
occurred during iteration`.
   - **After the fix**:
     - `ControllerRpcProbeSpec` + `RegionExecutionCoordinatorSpec` + 
`WorkflowExecutionCoordinatorSpec` → **11/11 pass**;
     - the stress test passed **5/5** consecutive runs.
   
   Reproduce (the `amber` module is the `WorkflowExecutionService` sbt project):
   
   ```
   sbt 'WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.ControllerRpcProbeSpec'
   sbt 'WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec'
   ```
   
   ### Was this PR authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Anthropic Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to