[ 
https://issues.apache.org/jira/browse/CAMEL-22127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kumar Gaurav updated CAMEL-22127:
---------------------------------
    Description: 
h3. What happened

{{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket*
*producer* while it traverses the collection that holds every live peer:

 

{{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}

{{findPeersForHostPort()}} does

 

{{return connectedPeers.values() // Map<String,List<...>>
.stream() // ← another thread can mutate
.flatMap(List::stream) // its internal ArrayList
.collect(Collectors.toList());}}

While the stream is walking one of the internal {{{}ArrayList{}}}s *another 
thread*
*removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws
{{{}ConcurrentModificationException{}}}.
----
h3. How it happens?

 

getVertxHostRegistry().values()       // Map.values() → mutable collection
      .stream()

~~~~
      .flatMap(host -> host.getConnectedPeers().stream())   // ← each host keeps
                                                            //   a mutable List

 

 

If we check this  host.getConnectedPeers()  --> it is an arraylist :

!image-2025-05-29-17-27-30-528.png!

{{Collections.synchronizedList(new ArrayList<>())}} only *serialises each*
*method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is 
_not_ thread-safe{*}.
If one thread is iterating while another thread mutates the list you still get
{{{}ConcurrentModificationException{}}}.

 

*How to fix:*

*private final List<VertxWebsocketPeer> connectedPeers = new 
CopyOnWriteArrayList<>();*

 

cheap reads and expensive writes ---> but better than Arraylist synchronized.

 

 
h3.  
h3. Why it shows up only under load
 * With *1 000 connections* and *32 writer threads* you call the producer
thousands of times per second (worker threads iterate peers), while Vert.x
I/O threads register/un-register peers concurrently.

 * Under light traffic the race window is tiny, so you never notice it.

 
 * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while 
another
thread mutates it.
 * {*}Quick work-around{*}: funnel writes through a single consumer queue.

 

Raw stacktrace:

 

10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN  
com.bp.ihub.template - Failed delivery for (MessageId: 
0B49011231DA68D-0000000000560726 on ExchangeId: 
0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: 
java.util.ConcurrentModificationException
java.util.ConcurrentModificationException: null
    at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
    at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
    at 
org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
    at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
    at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
    at 
org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
    at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
    at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
    at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
    at java.base/java.lang.Thread.run(Thread.java:840)

  was:
h3. What happened

{{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket*
*producer* while it traverses the collection that holds every live peer:

 

{{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}

{{findPeersForHostPort()}} does

 

{{return connectedPeers.values() // Map<String,List<...>>
.stream() // ← another thread can mutate
.flatMap(List::stream) // its internal ArrayList
.collect(Collectors.toList());}}

While the stream is walking one of the internal {{{}ArrayList{}}}s *another 
thread*
*removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws
{{{}ConcurrentModificationException{}}}.
----
h3. How it happens?

 

getVertxHostRegistry().values()       // Map.values() → mutable collection
      .stream()

~~~~
      .flatMap(host -> host.getConnectedPeers().stream())   // ← each host keeps
                                                            //   a mutable List

 

 

If we check this  host.getConnectedPeers()  --> it is an arraylist :

!image-2025-05-29-17-27-30-528.png!

 
h3.  
h3. Why it shows up only under load
 * With *1 000 connections* and *32 writer threads* you call the producer
thousands of times per second (worker threads iterate peers), while Vert.x
I/O threads register/un-register peers concurrently.

 * Under light traffic the race window is tiny, so you never notice it.

 
 * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while 
another
thread mutates it.
 * {*}Quick work-around{*}: funnel writes through a single consumer queue.

 

Raw stacktrace:

 

10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN  
com.bp.ihub.template - Failed delivery for (MessageId: 
0B49011231DA68D-0000000000560726 on ExchangeId: 
0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: 
java.util.ConcurrentModificationException
java.util.ConcurrentModificationException: null
    at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
    at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
    at 
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
    at 
org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
    at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
    at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
    at 
org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
    at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
    at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
    at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
    at java.base/java.lang.Thread.run(Thread.java:840)


> ConcurrentModificationException is coming inside Camel’s Vert.x WebSocket
> -------------------------------------------------------------------------
>
>                 Key: CAMEL-22127
>                 URL: https://issues.apache.org/jira/browse/CAMEL-22127
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-platform-http-vertx
>    Affects Versions: 4.4.5
>            Reporter: Kumar Gaurav
>            Priority: Major
>         Attachments: image-2025-05-29-17-27-30-528.png
>
>
> h3. What happened
> {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x 
> WebSocket*
> *producer* while it traverses the collection that holds every live peer:
>  
> {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}
> {{findPeersForHostPort()}} does
>  
> {{return connectedPeers.values() // Map<String,List<...>>
> .stream() // ← another thread can mutate
> .flatMap(List::stream) // its internal ArrayList
> .collect(Collectors.toList());}}
> While the stream is walking one of the internal {{{}ArrayList{}}}s *another 
> thread*
> *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator 
> throws
> {{{}ConcurrentModificationException{}}}.
> ----
> h3. How it happens?
>  
> getVertxHostRegistry().values()       // Map.values() → mutable collection
>       .stream()
> ~~~~
>       .flatMap(host -> host.getConnectedPeers().stream())   // ← each host 
> keeps
>                                                             //   a mutable 
> List
>  
>  
> If we check this  host.getConnectedPeers()  --> it is an arraylist :
> !image-2025-05-29-17-27-30-528.png!
> {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each*
> *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is 
> _not_ thread-safe{*}.
> If one thread is iterating while another thread mutates the list you still get
> {{{}ConcurrentModificationException{}}}.
>  
> *How to fix:*
> *private final List<VertxWebsocketPeer> connectedPeers = new 
> CopyOnWriteArrayList<>();*
>  
> cheap reads and expensive writes ---> but better than Arraylist synchronized.
>  
>  
> h3.  
> h3. Why it shows up only under load
>  * With *1 000 connections* and *32 writer threads* you call the producer
> thousands of times per second (worker threads iterate peers), while Vert.x
> I/O threads register/un-register peers concurrently.
>  * Under light traffic the race window is tiny, so you never notice it.
>  
>  * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} 
> while another
> thread mutates it.
>  * {*}Quick work-around{*}: funnel writes through a single consumer queue.
>  
> Raw stacktrace:
>  
> 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN  
> com.bp.ihub.template - Failed delivery for (MessageId: 
> 0B49011231DA68D-0000000000560726 on ExchangeId: 
> 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught: 
> java.util.ConcurrentModificationException
> java.util.ConcurrentModificationException: null
>     at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
>     at 
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
>     at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
>     at 
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
>     at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
>     at 
> org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
>     at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
>     at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
>     at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
>     at 
> org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
>     at 
> org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
>     at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
>     at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
>     at java.base/java.lang.Thread.run(Thread.java:840)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to