Hi.

I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
never ends because the rate drops to 5 or 6 inserts a second...


Here is a graph with part of the slowdown (as seen from RabbitMQ):
http://i.imgur.com/QAS6nwg.png
In this graph you can see the end of one run (I killed it) and the start of
another, the difference is staggering: http://i.imgur.com/dhd9GG1.png

I'm consuming from large database table with a simple query that outputs
about 400 000 lines that I then insert into a RabbitMQ queue.

The insert rate starts really high and then start to fall gradually.
I've tried running camel without inserting in the queue and there is also a
slowdown in processing.

The CPU is always maxed out with java...

>From the same machine can consume from the queue with storm at about 3000
lines/sec. Without slowdowns and with unmarshaling with XStream (8000
lines/s without unmarshaling).

Can anyone point me in the right direction to solve this?

Thanks.

#########################################################
Here is my Spring file.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://camel.apache.org/schema/spring 
       http://camel.apache.org/schema/spring/camel-spring.xsd";>

        
        <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
                destroy-method="close">
                <property name="driverClassName" 
value="oracle.jdbc.OracleDriver" />
                <property name="url" 
value="jdbc:oracle:thin:@localhost:1521:xe" />
                <property name="username" value="sys as sysdba" />
                <property name="password" value="xxxxx" />
        </bean>

        
        <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
                <property name="dataSource" ref="dataSource" />
        </bean>

        
        <camelContext streamCache="true"
xmlns="http://camel.apache.org/schema/spring";>
        
        
                
                <propertyPlaceholder id="placeholder" 
location="classpath:sql.properties"
/>

                
                <dataFormats>
                        <xstream id="xstream-utf8" encoding="UTF-8" />
                        <xstream id="xstream-default" />
                </dataFormats>
        
                
                <route id="databaseRoute">
                        <from
uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
/>
                        <to uri="sql:{{sql.selectOrderIdMax}}" />
                        <split streaming="true" parallelProcessing="true">
                                <simple>${body}</simple>
                                <to uri="direct:sqlqueue" />
                        </split>
                </route> 

                <route id="processOrder-route" >
                        <from uri="direct:sqlqueue" />
                        <process ref="lineProcessor" />
                        <marshal ref="xstream-utf8" />
                        <to
uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
                </route>
                

        </camelContext>

        
        <bean id="lineProcessor"
class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />

</beans>

#########################################################
The SQL is:
sql.selectOrderIdMax=select * from sales where integration_id = (select
max(integration_id) as max_int from sales)

#########################################################
The LineProcessor is just constructs an object for each line of the results
to the pass to the serializer. The slowdown happened without it.

/**
 * 
 */
package eu.rteixeira.dis.cameldbtoqueue;

import eu.rteixeira.dis.objmodel.LineBean;

import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class LineProcessor implements Processor {
    
        public void process(Exchange exchange) throws Exception {
        
        Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
        
        LineBean lb = new LineBean(row);
        
        
        exchange.getIn().setBody(lb);
    }
}

#########################################################
The LineBean

package eu.rteixeira.dis.objmodel;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Map;

public class LineBean {
        
        Integer store;
        Integer sku;
        String ean;
        BigDecimal qty;
        BigDecimal pvp; 
        Timestamp sales_date; 
        Timestamp vdate;
        Integer physical_store;
        Integer integration_id;
        Integer group_no;
        String tran_type;
        BigDecimal sales_value;
        BigDecimal discount_qty;
        BigDecimal discount_value;
        BigDecimal discount_voucher_qty;
        BigDecimal discount_voucher_value;
        BigDecimal base_value;
        
        
        
        public LineBean() {
                super();
        }

        public LineBean(Map<String, ?> row){
                
                store = ((row.get("store") == null) ? null :
Integer.parseInt(row.get("STORE").toString()));
                sku = ((row.get("sku") == null) ? null :
Integer.parseInt(row.get("sku").toString()));
                ean = ((row.get("ean") == null) ? null : 
row.get("ean").toString());
                qty = ((row.get("qty")==null) ? null : new
BigDecimal(row.get("qty").toString()));
                pvp = ((row.get("pvp") == null) ? null : new
BigDecimal(row.get("pvp").toString()));
                sales_date = ((row.get("sales_date") == null) ? null :
Timestamp.valueOf(row.get("sales_date").toString()));
                vdate = ((row.get("vdate") == null) ? null :
Timestamp.valueOf(row.get("vdate").toString()));
                physical_store = ((row.get("physical_store") == null) ? null :
Integer.parseInt(row.get("physical_store").toString()));
                integration_id = ((row.get("integration_id") == null) ? null :
Integer.parseInt(row.get("integration_id").toString()));
                group_no = ((row.get("group_no") == null) ? null :
Integer.parseInt(row.get("group_no").toString()));
                tran_type = ((row.get("tran_type") == null) ? null :
row.get("tran_type").toString());
                sales_value = ((row.get("sales_value") == null) ? null : new
BigDecimal(row.get("sales_value").toString()));
                discount_qty = ((row.get("discount_qty")==null) ? null : new
BigDecimal(row.get("discount_qty").toString()));
                discount_value = ((row.get("discount_value")==null) ? null : new
BigDecimal(row.get("discount_value").toString()));
                discount_voucher_qty = ((row.get("discount_voucher_qty")==null) 
? null :
new BigDecimal(row.get("discount_voucher_qty").toString()));
                discount_voucher_value = 
((row.get("discount_voucher_value")==null) ? null
: new BigDecimal(row.get("discount_voucher_value").toString()));
                base_value = ((row.get("base_value")==null) ? null : new
BigDecimal(row.get("base_value").toString()));
        
        }



--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to