As mentioned I did try using blockWhenFull=true.  It appears that the
population of offerTimeout > 0 is required for blockWhenFull to be
enabled.

The documentation doesn't mention and it's unclear how all the various
defaultBlockWhenFull, blockWhenFull etc are intended to be used.  I'll look
into submitting a doc PR change this weekend as looking back through the
mailing lists I've found some other references to this scenario.

I've updated the github example above with the fix and a package-info
summarizing the problem / solution.


On Thu, Feb 3, 2022 at 2:05 PM Jeremy Ross <jeremy.g.r...@gmail.com> wrote:

> I think you need to use blockWhenFull on the producer, so in your to()
> call.
>
> On Thu, Feb 3, 2022 at 10:33 AM Craig Taylor <ctalk...@ctalkobt.net>
> wrote:
>
> > I've got a scenario where I need to iterate through all users within a
> > system and send them to a remote as part of a routine "sync" to ensure
> that
> > both systems are in agreement.
> >
> > User imports are also supported so there is some initial aggregation done
> > on id in an attempt to minimize updates to the 3rd party system.   It's
> ok
> > to send duplicates over, just not ideal so the initial aggregation has a
> > size / timeout based aggregation.
> >
> > The 3rd party system (2nd step) is intended to aggregate and send those
> > updates in fixed size batches.   Changing it to a simple "mock:blah"
> > doesn't change behavior at this point.
> >
> > I'm running into issues when sending to the initial seda queue (see
> > attached github example) and getting a queue full.  This occurs even when
> > I'm attempting to use blockWhenFull and other parameters.
> >
> > Github at https://github.com/CTalkobt/issues .  (ref structure - Only
> > specific to this issue at this point; M
> > urphy will expand it in the future.)
> >
> > Route is :
> >
> > >         from("seda://incoming")
> > >                 .routeId("incoming")
> > >                 .aggregate(simple("${body.id}"),
> > >
> AggregationStrategies.flexible().accumulateInCollection(ArrayList.class)
> > )
> > >                 .completionSize(100)
> > >                 .completionTimeout(500)
> > >                 .process(exch -> logger.info("incoming 1:" +
> > > exch.getIn().getBody()))
> > >                 .to("seda://part2");
> >
> >
> >         from("seda://part2")
> >                 .aggregate(constant(1),
> > AggregationStrategies.flexible().accumulateInCollection(ArrayList.class))
> >
> >                     .completionTimeout(500)
> >                     .completionSize(500)
> >                 .process(exch -> logger.info("Got : " +
> > exch.getIn().getBody(List.class).size()) );
> >     }
> >
> > Stack trace:
> >
> > > org.apache.camel.CamelExecutionException: Exception occurred during
> > > execution on the exchange: Exchange[99618F8E0720C3E-00000000000003E8]
> > > at
> > >
> >
> org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
> > > ~[camel-api-3.14.1.jar:3.14.1]
> > > at
> > >
> >
> org.apache.camel.support.ExchangeHelper.extractResultBody(ExchangeHelper.java:687)
> > > ~[camel-support-3.14.1.jar:3.14.1]
> > > at
> > >
> >
> org.apache.camel.impl.engine.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:591)
> > > ~[camel-base-engine-3.14.1.jar:3.14.1]
> > > ....
> > > Caused by: java.lang.IllegalStateException: Queue full
> > > at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[na:1.8.0_312]
> > > at
> > >
> >
> org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251)
> > > ~[camel-seda-3.14.1.jar:3.14.1]
> > > at
> > >
> >
> org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149)
> > > ~[camel-seda-3.14.1.jar:3.14.1]
> >
> >
> >
> > Thanks,
> > --
> > -------------------------------------------
> > Craig Taylor
> > ctalk...@ctalkobt.net
> >
>


-- 
-------------------------------------------
Craig Taylor
ctalk...@ctalkobt.net

Reply via email to