Hi

Try using the stateful option for your timer, which will disable concurrent 
executions. I think you're running that process every 20 seconds, which could 
keep the garbage collector very busy.

Just a guess after glancing at it, but give it a try.
If it's not that, run it through a profiler and let us know what the cpu's 
doing.

Taariq

> On 14 Oct 2013, at 19:39, rjsteixeira <rica...@ctrler.eu> wrote:
> 
> Hi.
> 
> I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
> never ends because the rate drops to 5 or 6 inserts a second...
> 
> 
> Here is a graph with part of the slowdown (as seen from RabbitMQ):
> http://i.imgur.com/QAS6nwg.png
> In this graph you can see the end of one run (I killed it) and the start of
> another, the difference is staggering: http://i.imgur.com/dhd9GG1.png
> 
> I'm consuming from large database table with a simple query that outputs
> about 400 000 lines that I then insert into a RabbitMQ queue.
> 
> The insert rate starts really high and then start to fall gradually.
> I've tried running camel without inserting in the queue and there is also a
> slowdown in processing.
> 
> The CPU is always maxed out with java...
> 
> From the same machine can consume from the queue with storm at about 3000
> lines/sec. Without slowdowns and with unmarshaling with XStream (8000
> lines/s without unmarshaling).
> 
> Can anyone point me in the right direction to solve this?
> 
> Thanks.
> 
> #########################################################
> Here is my Spring file.
> 
> <?xml version="1.0" encoding="UTF-8"?>
> 
> <beans xmlns="http://www.springframework.org/schema/beans";
>    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>    xsi:schemaLocation="
>       http://www.springframework.org/schema/beans 
>       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
>       http://camel.apache.org/schema/spring 
>       http://camel.apache.org/schema/spring/camel-spring.xsd";>
> 
>    
>    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
>        destroy-method="close">
>        <property name="driverClassName" value="oracle.jdbc.OracleDriver" />
>        <property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
>        <property name="username" value="sys as sysdba" />
>        <property name="password" value="xxxxx" />
>    </bean>
> 
>    
>    <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
>        <property name="dataSource" ref="dataSource" />
>    </bean>
> 
>    
>    <camelContext streamCache="true"
> xmlns="http://camel.apache.org/schema/spring";>
>    
>    
>        
>        <propertyPlaceholder id="placeholder" 
> location="classpath:sql.properties"
> />
> 
>        
>        <dataFormats>
>            <xstream id="xstream-utf8" encoding="UTF-8" />
>            <xstream id="xstream-default" />
>        </dataFormats>
>    
>        
>        <route id="databaseRoute">
>            <from
> uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
> />
>            <to uri="sql:{{sql.selectOrderIdMax}}" />
>            <split streaming="true" parallelProcessing="true">
>                <simple>${body}</simple>
>                <to uri="direct:sqlqueue" />
>            </split>
>        </route> 
> 
>        <route id="processOrder-route" >
>            <from uri="direct:sqlqueue" />
>            <process ref="lineProcessor" />
>            <marshal ref="xstream-utf8" />
>            <to
> uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
>        </route>
>        
> 
>    </camelContext>
> 
>    
>    <bean id="lineProcessor"
> class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />
> 
> </beans>
> 
> #########################################################
> The SQL is:
> sql.selectOrderIdMax=select * from sales where integration_id = (select
> max(integration_id) as max_int from sales)
> 
> #########################################################
> The LineProcessor is just constructs an object for each line of the results
> to the pass to the serializer. The slowdown happened without it.
> 
> /**
> * 
> */
> package eu.rteixeira.dis.cameldbtoqueue;
> 
> import eu.rteixeira.dis.objmodel.LineBean;
> 
> import java.util.Map;
> 
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> 
> public class LineProcessor implements Processor {
> 
>    public void process(Exchange exchange) throws Exception {
>        
>        Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
>        
>        LineBean lb = new LineBean(row);
>        
>        
>        exchange.getIn().setBody(lb);
>    }
> }
> 
> #########################################################
> The LineBean
> 
> package eu.rteixeira.dis.objmodel;
> 
> import java.math.BigDecimal;
> import java.sql.Timestamp;
> import java.util.Map;
> 
> public class LineBean {
>    
>    Integer store;
>    Integer sku;
>    String ean;
>    BigDecimal qty;
>    BigDecimal pvp; 
>    Timestamp sales_date; 
>    Timestamp vdate;
>    Integer physical_store;
>    Integer integration_id;
>    Integer group_no;
>    String tran_type;
>    BigDecimal sales_value;
>    BigDecimal discount_qty;
>    BigDecimal discount_value;
>    BigDecimal discount_voucher_qty;
>    BigDecimal discount_voucher_value;
>    BigDecimal base_value;
>    
>    
>    
>    public LineBean() {
>        super();
>    }
> 
>    public LineBean(Map<String, ?> row){
>        
>        store = ((row.get("store") == null) ? null :
> Integer.parseInt(row.get("STORE").toString()));
>        sku = ((row.get("sku") == null) ? null :
> Integer.parseInt(row.get("sku").toString()));
>        ean = ((row.get("ean") == null) ? null : row.get("ean").toString());
>        qty = ((row.get("qty")==null) ? null : new
> BigDecimal(row.get("qty").toString()));
>        pvp = ((row.get("pvp") == null) ? null : new
> BigDecimal(row.get("pvp").toString()));
>        sales_date = ((row.get("sales_date") == null) ? null :
> Timestamp.valueOf(row.get("sales_date").toString()));
>        vdate = ((row.get("vdate") == null) ? null :
> Timestamp.valueOf(row.get("vdate").toString()));
>        physical_store = ((row.get("physical_store") == null) ? null :
> Integer.parseInt(row.get("physical_store").toString()));
>        integration_id = ((row.get("integration_id") == null) ? null :
> Integer.parseInt(row.get("integration_id").toString()));
>        group_no = ((row.get("group_no") == null) ? null :
> Integer.parseInt(row.get("group_no").toString()));
>        tran_type = ((row.get("tran_type") == null) ? null :
> row.get("tran_type").toString());
>        sales_value = ((row.get("sales_value") == null) ? null : new
> BigDecimal(row.get("sales_value").toString()));
>        discount_qty = ((row.get("discount_qty")==null) ? null : new
> BigDecimal(row.get("discount_qty").toString()));
>        discount_value = ((row.get("discount_value")==null) ? null : new
> BigDecimal(row.get("discount_value").toString()));
>        discount_voucher_qty = ((row.get("discount_voucher_qty")==null) ? null 
> :
> new BigDecimal(row.get("discount_voucher_qty").toString()));
>        discount_voucher_value = ((row.get("discount_voucher_value")==null) ? 
> null
> : new BigDecimal(row.get("discount_voucher_value").toString()));
>        base_value = ((row.get("base_value")==null) ? null : new
> BigDecimal(row.get("base_value").toString()));
>    
>    }
> 
> 
> 
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to