[ https://issues.apache.org/jira/browse/CAMEL-22127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954963#comment-17954963 ]
Kumar Gaurav commented on CAMEL-22127: -------------------------------------- I will create PR with my fixes which resolved this as well as the issue with ValueSplitIterator of concurrent hashmap which has been coming when I fixed the CopyWONAL > ConcurrentModificationException is coming inside Camel’s Vert.x WebSocket > ------------------------------------------------------------------------- > > Key: CAMEL-22127 > URL: https://issues.apache.org/jira/browse/CAMEL-22127 > Project: Camel > Issue Type: Bug > Components: camel-platform-http-vertx > Affects Versions: 4.4.5 > Reporter: Kumar Gaurav > Priority: Major > Fix For: 4.8.8, 4.10.5, 4.13.0 > > Attachments: image-2025-05-29-17-27-30-528.png > > > h3. What happened > {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x > WebSocket* > *producer* while it traverses the collection that holds every live peer: > > {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}} > {{findPeersForHostPort()}} does > > {{return connectedPeers.values() // Map<String,List<...>> > .stream() // ← another thread can mutate > .flatMap(List::stream) // its internal ArrayList > .collect(Collectors.toList());}} > While the stream is walking one of the internal {{{}ArrayList{}}}s *another > thread* > *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator > throws > {{{}ConcurrentModificationException{}}}. > ---- > h3. How it happens? > > getVertxHostRegistry().values() // Map.values() → mutable collection > .stream() > ~~~~ > .flatMap(host -> host.getConnectedPeers().stream()) // ← each host > keeps > // a mutable > List > > > If we check this host.getConnectedPeers() --> it is an arraylist : > !image-2025-05-29-17-27-30-528.png! > {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each* > *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is > _not_ thread-safe{*}. > If one thread is iterating while another thread mutates the list you still get > {{{}ConcurrentModificationException{}}}. > > *How to fix:* > *private final List<VertxWebsocketPeer> connectedPeers = new > CopyOnWriteArrayList<>();* > > cheap reads and expensive writes ---> but better than Arraylist synchronized. > h3. Rough numbers (JVM 17, 1 000 peers) > ||Operation||{{synchronizedList}}||{{CopyOnWriteArrayList}}|| > |Iterate (1 000 elements)|~3 µs (lock + volatile read)|*~1 µs*| > |Add / remove|~0.4 µs|~0.7–1 µs _(array copy)_| > > {*}Net impact{*}: latency *improves for every broadcast* (no lock), while > the extra cost on connection OPEN/CLOSE is trivial because it happens only > once per client. > For a read-heavy fan-out list, {{CopyOnWriteArrayList}} is the ideal > trade-off. Hence - *switch to {{CopyOnWriteArrayList}}* – the read-path win > outweighs the negligible write (only on open and close connections which do > not happen very frequently) > cost and eliminates the {{ConcurrentModificationException}} entirely. > h3. Another option - we can replace it with ConcurrentHashmap.Keyset() to > store hosted peers. > h3. Why it shows up only under load > * With *1 000 connections* and *32 writer threads* you call the producer > thousands of times per second (worker threads iterate peers), while Vert.x > I/O threads register/un-register peers concurrently. > * Under light traffic the race window is tiny, so you never notice it. > > * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} > while another > thread mutates it. > * {*}Quick work-around{*}: funnel writes through a single consumer queue. > > Raw stacktrace: > > 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN > com.bp.ihub.template - Failed delivery for (MessageId: > 0B49011231DA68D-0000000000560726 on ExchangeId: > 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: > java.util.ConcurrentModificationException > java.util.ConcurrentModificationException: null > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631) > at > java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210) > at > org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90) > at > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840) > at > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:163) > at > org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354) > at > org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330) > at > org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270) > at > org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188) > at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129) > at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) > at java.base/java.lang.Thread.run(Thread.java:840) -- This message was sent by Atlassian Jira (v8.20.10#820010)