Hi,

I'm trying to resend failed messages for delayed reprocessing via
helper seda queue. If the body of failed exchange wouldn't be very
big, then I'd use redelivery methods available in camel. But having
too many failing exchanges leads to OOM errors. So, instead of default
retrying I'll resend the message while removing the current body. It
works, but somehow getting slower and slower if there are too many
failing exchanges. Looks like "seda:reprocess" is blocking main
processing route.

The question is, do I block the processing thread in "direct:process_body_part"?



@Override
public void configure() throws Exception {

   //get the data, split, send for processing
    from("direct:start").split().body().parallelProcessing()
        .log("processing ID ${body}").to("direct:process_body_part");

    //process piece, on error send to queue or finally to error endpoint
    from("direct:process_body_part")
            .onException(Exception.class)
                .handled(true)
                .setBody(exchangeProperty("ID"))//remove expensive
body from memory
                .choice().when(exchange ->
exchange.getProperty("retry_count", Integer.class) > 10)
                    .to("direct:error")
                .endChoice()
                .otherwise()
                    .to("seda:reprocess")
                .endChoice()
                .end()
            .end()
            .log("processing ID ${body}")
            .to("direct:success");

    //wait a bit and send to processing route
    from("seda:reprocess")
            .delay(1000).asyncDelayed()
            .to("direct:process_body_part");
}

Cheerz!

Reply via email to