As mentioned I did try using blockWhenFull=true. It appears that the population of offerTimeout > 0 is required for blockWhenFull to be enabled.
The documentation doesn't mention and it's unclear how all the various defaultBlockWhenFull, blockWhenFull etc are intended to be used. I'll look into submitting a doc PR change this weekend as looking back through the mailing lists I've found some other references to this scenario. I've updated the github example above with the fix and a package-info summarizing the problem / solution. On Thu, Feb 3, 2022 at 2:05 PM Jeremy Ross <jeremy.g.r...@gmail.com> wrote: > I think you need to use blockWhenFull on the producer, so in your to() > call. > > On Thu, Feb 3, 2022 at 10:33 AM Craig Taylor <ctalk...@ctalkobt.net> > wrote: > > > I've got a scenario where I need to iterate through all users within a > > system and send them to a remote as part of a routine "sync" to ensure > that > > both systems are in agreement. > > > > User imports are also supported so there is some initial aggregation done > > on id in an attempt to minimize updates to the 3rd party system. It's > ok > > to send duplicates over, just not ideal so the initial aggregation has a > > size / timeout based aggregation. > > > > The 3rd party system (2nd step) is intended to aggregate and send those > > updates in fixed size batches. Changing it to a simple "mock:blah" > > doesn't change behavior at this point. > > > > I'm running into issues when sending to the initial seda queue (see > > attached github example) and getting a queue full. This occurs even when > > I'm attempting to use blockWhenFull and other parameters. > > > > Github at https://github.com/CTalkobt/issues . (ref structure - Only > > specific to this issue at this point; M > > urphy will expand it in the future.) > > > > Route is : > > > > > from("seda://incoming") > > > .routeId("incoming") > > > .aggregate(simple("${body.id}"), > > > > AggregationStrategies.flexible().accumulateInCollection(ArrayList.class) > > ) > > > .completionSize(100) > > > .completionTimeout(500) > > > .process(exch -> logger.info("incoming 1:" + > > > exch.getIn().getBody())) > > > .to("seda://part2"); > > > > > > from("seda://part2") > > .aggregate(constant(1), > > AggregationStrategies.flexible().accumulateInCollection(ArrayList.class)) > > > > .completionTimeout(500) > > .completionSize(500) > > .process(exch -> logger.info("Got : " + > > exch.getIn().getBody(List.class).size()) ); > > } > > > > Stack trace: > > > > > org.apache.camel.CamelExecutionException: Exception occurred during > > > execution on the exchange: Exchange[99618F8E0720C3E-00000000000003E8] > > > at > > > > > > org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45) > > > ~[camel-api-3.14.1.jar:3.14.1] > > > at > > > > > > org.apache.camel.support.ExchangeHelper.extractResultBody(ExchangeHelper.java:687) > > > ~[camel-support-3.14.1.jar:3.14.1] > > > at > > > > > > org.apache.camel.impl.engine.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:591) > > > ~[camel-base-engine-3.14.1.jar:3.14.1] > > > .... > > > Caused by: java.lang.IllegalStateException: Queue full > > > at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[na:1.8.0_312] > > > at > > > > > > org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251) > > > ~[camel-seda-3.14.1.jar:3.14.1] > > > at > > > > > > org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149) > > > ~[camel-seda-3.14.1.jar:3.14.1] > > > > > > > > Thanks, > > -- > > ------------------------------------------- > > Craig Taylor > > ctalk...@ctalkobt.net > > > -- ------------------------------------------- Craig Taylor ctalk...@ctalkobt.net