[ 
https://issues.apache.org/jira/browse/CAMEL-14144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shahbaz Akhter updated CAMEL-14144:
-----------------------------------
    Description: 
For the points 1 & 2, do we have camel docs available ?

1) When java calls a seda component directly, if the seda queue is full, Java 
must handle it.

2) If java calls direct component first, which in turn calls seda, if the seda 
queue is full, the DQL camel error handler handles it.

The underlying problem we experienced was java calling seda, getting a 'queue 
full' exception, but since the java code didn't look at the exception on the 
response object, it was being missed.

Since this queue was full we tend to have inflight messages which are kinda 
lost since batch shutdowns without waiting for inflight message to process.

Below is sample code
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.spring.ShutdownTimeout;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;

import javax.inject.Inject;

import java.util.stream.IntStream;

//@TestPropertySource(properties="camel.springboot.shutdownTimeout=5")
@ShutdownTimeout(value = 1)
@ContextConfiguration(classes = CamelShutdownTest.TestConfiguration.class)
public class CamelShutdownTest extends AbstractCamelTest {
    @Configuration
    public static class TestConfiguration {
        @Bean
        public RouteBuilder sedaRoute() {
            return new RouteBuilder() {
                @Override
                public void configure() {
                    errorHandler(deadLetterChannel("direct:dead"));
                    from("direct:test")
                            .to("seda:test?size=2");
                    from("seda:test?size=2")
                            .delayer(2000L)
                            .log(LoggingLevel.INFO, 
LoggerFactory.getLogger(CamelShutdownTest.class), "done ${body}");
                    from("direct:dead")
                            .log(LoggingLevel.INFO, 
LoggerFactory.getLogger(CamelShutdownTest.class), "dead ${body}");
                }
            };
        }
    }

    @EndpointInject(uri = "direct:test")
    private FluentProducerTemplate fluentProducerTemplate;
    @Inject
    private CamelContext context;

    @Test
    public void test() throws Exception {
        context.start();
        fluentProducerTemplate.withBody("warm up").send();
        Thread.sleep(1000L);
        IntStream.range(0, 100).boxed().forEach((i) -> {
            logger.info("Queued {}", i);
            try {
                Thread.sleep(5L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            fluentProducerTemplate.withBody("test message " + i).send();
        });
    }
}
{code}
 

  was:
For the points 1 & 2, do we have camel docs available ?

1) When java calls a seda component directly, if the seda queue is full, Java 
must handle it.

2) If java calls direct component first, which in turn calls seda, if the seda 
queue is full, the DQL camel error handler handles it.

The underlying problem we experienced was java calling seda, getting a 'queue 
full' exception, but since the java code didn't look at the exception on the 
response object, it was being missed.

Since this queue was full we tend to have inflight messages which are kinda 
lost since batch shutdowns without waiting for inflight message to process.

Below is sample code
{code:java}
import com.edb.payment.pays.core.test.AbstractCamelTest;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.spring.ShutdownTimeout;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import javax.inject.Inject;
import java.util.stream.IntStream;
//@TestPropertySource(properties="camel.springboot.shutdownTimeout=5")
@ShutdownTimeout(value = 1)
@ContextConfiguration(classes = CamelShutdownTest.TestConfiguration.class)
public class CamelShutdownTest extends AbstractCamelTest {
 @Configuration
 public static class TestConfiguration {
 @Bean
 public RouteBuilder sedaRoute() {
 return new RouteBuilder() {
 @Override
 public void configure() {
 errorHandler(deadLetterChannel("direct:dead"));
 from("direct:test")
 .to("seda:test?size=2");
 from("seda:test?size=2")
 .delayer(2000L)
 .log(LoggingLevel.INFO, LoggerFactory.getLogger(CamelShutdownTest.class), 
"done ${body}");
 from("direct:dead")
 .log(LoggingLevel.INFO, LoggerFactory.getLogger(CamelShutdownTest.class), 
"dead ${body}");
 }
 };
 }
 }
@EndpointInject(uri = "direct:test")
 private FluentProducerTemplate fluentProducerTemplate;
@Inject
 private CamelContext context;
@Test
 public void test() throws Exception {
 context.start();
 fluentProducerTemplate.withBody("warm up").send();
 Thread.sleep(1000L);
 IntStream.range(0, 100).boxed().forEach((i) -> {
 logger.info("Queued {}", i);
 try {
 Thread.sleep(5L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 fluentProducerTemplate.withBody("test message " + i).send();
 });
 }
}
{code}
 


> More Information regarding java to seda in camel docs
> -----------------------------------------------------
>
>                 Key: CAMEL-14144
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14144
>             Project: Camel
>          Issue Type: Improvement
>            Reporter: Shahbaz Akhter
>            Priority: Minor
>
> For the points 1 & 2, do we have camel docs available ?
> 1) When java calls a seda component directly, if the seda queue is full, Java 
> must handle it.
> 2) If java calls direct component first, which in turn calls seda, if the 
> seda queue is full, the DQL camel error handler handles it.
> The underlying problem we experienced was java calling seda, getting a 'queue 
> full' exception, but since the java code didn't look at the exception on the 
> response object, it was being missed.
> Since this queue was full we tend to have inflight messages which are kinda 
> lost since batch shutdowns without waiting for inflight message to process.
> Below is sample code
> {code:java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.EndpointInject;
> import org.apache.camel.FluentProducerTemplate;
> import org.apache.camel.LoggingLevel;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.test.spring.ShutdownTimeout;
> import org.junit.jupiter.api.Test;
> import org.slf4j.LoggerFactory;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.Configuration;
> import org.springframework.test.context.ContextConfiguration;
> import org.springframework.test.context.TestPropertySource;
> import javax.inject.Inject;
> import java.util.stream.IntStream;
> //@TestPropertySource(properties="camel.springboot.shutdownTimeout=5")
> @ShutdownTimeout(value = 1)
> @ContextConfiguration(classes = CamelShutdownTest.TestConfiguration.class)
> public class CamelShutdownTest extends AbstractCamelTest {
>     @Configuration
>     public static class TestConfiguration {
>         @Bean
>         public RouteBuilder sedaRoute() {
>             return new RouteBuilder() {
>                 @Override
>                 public void configure() {
>                     errorHandler(deadLetterChannel("direct:dead"));
>                     from("direct:test")
>                             .to("seda:test?size=2");
>                     from("seda:test?size=2")
>                             .delayer(2000L)
>                             .log(LoggingLevel.INFO, 
> LoggerFactory.getLogger(CamelShutdownTest.class), "done ${body}");
>                     from("direct:dead")
>                             .log(LoggingLevel.INFO, 
> LoggerFactory.getLogger(CamelShutdownTest.class), "dead ${body}");
>                 }
>             };
>         }
>     }
>     @EndpointInject(uri = "direct:test")
>     private FluentProducerTemplate fluentProducerTemplate;
>     @Inject
>     private CamelContext context;
>     @Test
>     public void test() throws Exception {
>         context.start();
>         fluentProducerTemplate.withBody("warm up").send();
>         Thread.sleep(1000L);
>         IntStream.range(0, 100).boxed().forEach((i) -> {
>             logger.info("Queued {}", i);
>             try {
>                 Thread.sleep(5L);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             fluentProducerTemplate.withBody("test message " + i).send();
>         });
>     }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to