[ https://issues.apache.org/jira/browse/CAMEL-22127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kumar Gaurav updated CAMEL-22127: --------------------------------- Description: h3. What happened {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket* *producer* while it traverses the collection that holds every live peer: {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}} {{findPeersForHostPort()}} does {{return connectedPeers.values() // Map<String,List<...>> .stream() // ← another thread can mutate .flatMap(List::stream) // its internal ArrayList .collect(Collectors.toList());}} While the stream is walking one of the internal {{{}ArrayList{}}}s *another thread* *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws {{{}ConcurrentModificationException{}}}. ---- h3. How it happens? getVertxHostRegistry().values() // Map.values() → mutable collection .stream() ~~~~ .flatMap(host -> host.getConnectedPeers().stream()) // ← each host keeps // a mutable List If we check this host.getConnectedPeers() --> it is an arraylist : !image-2025-05-29-17-27-30-528.png! {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each* *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is _not_ thread-safe{*}. If one thread is iterating while another thread mutates the list you still get {{{}ConcurrentModificationException{}}}. *How to fix:* *private final List<VertxWebsocketPeer> connectedPeers = new CopyOnWriteArrayList<>();* cheap reads and expensive writes ---> but better than Arraylist synchronized. h3. h3. Why it shows up only under load * With *1 000 connections* and *32 writer threads* you call the producer thousands of times per second (worker threads iterate peers), while Vert.x I/O threads register/un-register peers concurrently. * Under light traffic the race window is tiny, so you never notice it. * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while another thread mutates it. * {*}Quick work-around{*}: funnel writes through a single consumer queue. Raw stacktrace: 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN com.bp.ihub.template - Failed delivery for (MessageId: 0B49011231DA68D-0000000000560726 on ExchangeId: 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: java.util.ConcurrentModificationException java.util.ConcurrentModificationException: null at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234) at org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109) at org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210) at org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) at org.apache.camel.processor.Pipeline.process(Pipeline.java:163) at org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354) at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330) at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270) at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188) at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129) at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) at java.base/java.lang.Thread.run(Thread.java:840) was: h3. What happened {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket* *producer* while it traverses the collection that holds every live peer: {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}} {{findPeersForHostPort()}} does {{return connectedPeers.values() // Map<String,List<...>> .stream() // ← another thread can mutate .flatMap(List::stream) // its internal ArrayList .collect(Collectors.toList());}} While the stream is walking one of the internal {{{}ArrayList{}}}s *another thread* *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws {{{}ConcurrentModificationException{}}}. ---- h3. How it happens? getVertxHostRegistry().values() // Map.values() → mutable collection .stream() ~~~~ .flatMap(host -> host.getConnectedPeers().stream()) // ← each host keeps // a mutable List If we check this host.getConnectedPeers() --> it is an arraylist : !image-2025-05-29-17-27-30-528.png! h3. h3. Why it shows up only under load * With *1 000 connections* and *32 writer threads* you call the producer thousands of times per second (worker threads iterate peers), while Vert.x I/O threads register/un-register peers concurrently. * Under light traffic the race window is tiny, so you never notice it. * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while another thread mutates it. * {*}Quick work-around{*}: funnel writes through a single consumer queue. Raw stacktrace: 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN com.bp.ihub.template - Failed delivery for (MessageId: 0B49011231DA68D-0000000000560726 on ExchangeId: 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: java.util.ConcurrentModificationException java.util.ConcurrentModificationException: null at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234) at org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109) at org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210) at org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) at org.apache.camel.processor.Pipeline.process(Pipeline.java:163) at org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354) at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330) at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270) at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188) at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129) at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) at java.base/java.lang.Thread.run(Thread.java:840) > ConcurrentModificationException is coming inside Camel’s Vert.x WebSocket > ------------------------------------------------------------------------- > > Key: CAMEL-22127 > URL: https://issues.apache.org/jira/browse/CAMEL-22127 > Project: Camel > Issue Type: Bug > Components: camel-platform-http-vertx > Affects Versions: 4.4.5 > Reporter: Kumar Gaurav > Priority: Major > Attachments: image-2025-05-29-17-27-30-528.png > > > h3. What happened > {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x > WebSocket* > *producer* while it traverses the collection that holds every live peer: > > {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}} > {{findPeersForHostPort()}} does > > {{return connectedPeers.values() // Map<String,List<...>> > .stream() // ← another thread can mutate > .flatMap(List::stream) // its internal ArrayList > .collect(Collectors.toList());}} > While the stream is walking one of the internal {{{}ArrayList{}}}s *another > thread* > *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator > throws > {{{}ConcurrentModificationException{}}}. > ---- > h3. How it happens? > > getVertxHostRegistry().values() // Map.values() → mutable collection > .stream() > ~~~~ > .flatMap(host -> host.getConnectedPeers().stream()) // ← each host > keeps > // a mutable > List > > > If we check this host.getConnectedPeers() --> it is an arraylist : > !image-2025-05-29-17-27-30-528.png! > {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each* > *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is > _not_ thread-safe{*}. > If one thread is iterating while another thread mutates the list you still get > {{{}ConcurrentModificationException{}}}. > > *How to fix:* > *private final List<VertxWebsocketPeer> connectedPeers = new > CopyOnWriteArrayList<>();* > > cheap reads and expensive writes ---> but better than Arraylist synchronized. > > > h3. > h3. Why it shows up only under load > * With *1 000 connections* and *32 writer threads* you call the producer > thousands of times per second (worker threads iterate peers), while Vert.x > I/O threads register/un-register peers concurrently. > * Under light traffic the race window is tiny, so you never notice it. > > * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} > while another > thread mutates it. > * {*}Quick work-around{*}: funnel writes through a single consumer queue. > > Raw stacktrace: > > 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN > com.bp.ihub.template - Failed delivery for (MessageId: > 0B49011231DA68D-0000000000560726 on ExchangeId: > 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: > java.util.ConcurrentModificationException > java.util.ConcurrentModificationException: null > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631) > at > java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109) > at > org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210) > at > org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90) > at > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840) > at > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) > at > org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:163) > at > org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354) > at > org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330) > at > org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270) > at > org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188) > at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129) > at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241) > at java.base/java.lang.Thread.run(Thread.java:840) -- This message was sent by Atlassian Jira (v8.20.10#820010)