[ 
https://issues.apache.org/jira/browse/CAMEL-22127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954963#comment-17954963
 ] 

Kumar Gaurav commented on CAMEL-22127:
--------------------------------------

I will create PR with my fixes which resolved this as well as the issue with 
ValueSplitIterator of concurrent hashmap which has been coming when I fixed the 
 CopyWONAL

 

> ConcurrentModificationException is coming inside Camel’s Vert.x WebSocket
> -------------------------------------------------------------------------
>
>                 Key: CAMEL-22127
>                 URL: https://issues.apache.org/jira/browse/CAMEL-22127
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-platform-http-vertx
>    Affects Versions: 4.4.5
>            Reporter: Kumar Gaurav
>            Priority: Major
>             Fix For: 4.8.8, 4.10.5, 4.13.0
>
>         Attachments: image-2025-05-29-17-27-30-528.png
>
>
> h3. What happened
> {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x 
> WebSocket*
> *producer* while it traverses the collection that holds every live peer:
>  
> {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}
> {{findPeersForHostPort()}} does
>  
> {{return connectedPeers.values() // Map<String,List<...>>
> .stream() // ← another thread can mutate
> .flatMap(List::stream) // its internal ArrayList
> .collect(Collectors.toList());}}
> While the stream is walking one of the internal {{{}ArrayList{}}}s *another 
> thread*
> *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator 
> throws
> {{{}ConcurrentModificationException{}}}.
> ----
> h3. How it happens?
>  
> getVertxHostRegistry().values()       // Map.values() → mutable collection
>       .stream()
> ~~~~
>       .flatMap(host -> host.getConnectedPeers().stream())   // ← each host 
> keeps
>                                                             //   a mutable 
> List
>  
>  
> If we check this  host.getConnectedPeers()  --> it is an arraylist :
> !image-2025-05-29-17-27-30-528.png!
> {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each*
> *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is 
> _not_ thread-safe{*}.
> If one thread is iterating while another thread mutates the list you still get
> {{{}ConcurrentModificationException{}}}.
>  
> *How to fix:*
> *private final List<VertxWebsocketPeer> connectedPeers = new 
> CopyOnWriteArrayList<>();*
>  
> cheap reads and expensive writes ---> but better than Arraylist synchronized.
> h3. Rough numbers (JVM 17, 1 000 peers)
> ||Operation||{{synchronizedList}}||{{CopyOnWriteArrayList}}||
> |Iterate (1 000 elements)|~3 µs (lock + volatile read)|*~1 µs*|
> |Add / remove|~0.4 µs|~0.7–1 µs _(array copy)_|
>  
> {*}Net impact{*}: latency *improves for every broadcast* (no lock), while
> the extra cost on connection OPEN/CLOSE is trivial because it happens only
> once per client.
> For a read-heavy fan-out list, {{CopyOnWriteArrayList}} is the ideal 
> trade-off. Hence - *switch to {{CopyOnWriteArrayList}}* – the read-path win 
> outweighs the negligible write (only on open and close connections which do 
> not happen very frequently)
> cost and eliminates the {{ConcurrentModificationException}} entirely.
> h3.  Another option  - we can replace it with ConcurrentHashmap.Keyset() to 
> store hosted peers.
> h3. Why it shows up only under load
>  * With *1 000 connections* and *32 writer threads* you call the producer
> thousands of times per second (worker threads iterate peers), while Vert.x
> I/O threads register/un-register peers concurrently.
>  * Under light traffic the race window is tiny, so you never notice it.
>  
>  * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} 
> while another
> thread mutates it.
>  * {*}Quick work-around{*}: funnel writes through a single consumer queue.
>  
> Raw stacktrace:
>  
> 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN  
> com.bp.ihub.template - Failed delivery for (MessageId: 
> 0B49011231DA68D-0000000000560726 on ExchangeId: 
> 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: 
> java.util.ConcurrentModificationException
> java.util.ConcurrentModificationException: null
>     at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
>     at 
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
>     at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
>     at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
>     at 
> org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
>     at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
>     at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
>     at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
>     at 
> org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
>     at 
> org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
>     at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
>     at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
>     at java.base/java.lang.Thread.run(Thread.java:840)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to