Hi Christian, first of all, thanks for reply.
I've 160 routes like rt-* (see below). These routes, polling files in
directories and send each file to seda endpoint. The seda endpoint
process messages and send a processed message to ActiveMQ. I'm using
ActiveMQ because these process are running at another computer.
I can't use recursive property for file endpoint because I've a lot of
business rules to verify depends on complete file path, etc.
I'm using Apache Camel 2.10.2 ans ActiveMQ 5.7.
Following the routes:
<!-- Retrive files from directory 1, server A -->
<route id="rt-147" errorHandlerRef="myErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from
uri="file:/mnt/serverA/1?charset=iso-8859-1&readLock=changed&readLockCheckInterval=200&readLockMinLength=0&delay=500&maxMessagesPerPoll=100&sortBy=${file:modified}"
/>
<onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
<redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
<to uri="activemq:queue:alarm" />
</onException>
<choice>
<when>
<simple>${file:name} contains '_FILE1.txt'</simple>
<to uri="seda:fe_FILE1" />
</when>
<when>
<simple>${file:name} contains '_FILE2.txt'</simple>
<to uri="seda:fe_FILE2" />
</when>
</choice>
</route>
<!-- Retrive files from directory 2, server A -->
<route id="rt-148" errorHandlerRef="myErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from
uri="file:/serverA/2?charset=iso-8859-1&readLock=changed&readLockCheckInterval=200&readLockMinLength=0&delay=500&maxMessagesPerPoll=100&sortBy=${file:modified}"
/>
<onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
<redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
<to uri="activemq:queue:alarm" />
</onException>
<choice>
<when>
<simple>${file:name} contains '_FILE1.txt'</simple>
<to uri="seda:fe_FILE1" />
</when>
<when>
<simple>${file:name} contains '_FILE2.txt'</simple>
<to uri="seda:fe_FILE2" />
</when>
</choice>
</route>
<!-- Retrive files from directory 1, server B -->
<route id="rt-149" errorHandlerRef="myErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from
uri="file:/serverB/1?charset=iso-8859-1&readLock=changed&readLockCheckInterval=200&readLockMinLength=0&delay=500&maxMessagesPerPoll=100&sortBy=${file:modified}"
/>
<onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
<redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
<to uri="activemq:queue:alarm" />
</onException>
<choice>
<when>
<simple>${file:name} contains '_FILE1.txt'</simple>
<to uri="seda:fe_FILE1" />
</when>
<when>
<simple>${file:name} contains '_FILE2.txt'</simple>
<to uri="seda:fe_FILE2" />
</when>
</choice>
</route>
<!-- Processor for files from type 1 -->
<route id="sedaFile1" errorHandlerRef="mirErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from
uri="seda:fe_FILE1?concurrentConsumers=10&waitForTaskToComplete=Never"
/>
<process ref="switchFileFrontendCounterProcessor" />
<setHeader
headerName="ALARM_TARGET"><constant>F1</constant></setHeader>
<to uri="activemq:queue:alarm" />
</route>
<!-- Processor for files from type 2 -->
<route id="sedaFile2" errorHandlerRef="mirErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from
uri="seda:fe_FILE2?concurrentConsumers=10&waitForTaskToComplete=Never"
/>
<process ref="switchFileFrontendCounterProcessor" />
<setHeader
headerName="ALARM_TARGET"><constant>F2</constant></setHeader>
<to uri="activemq:queue:alarm" />
</route>
<!-- Queue for process Alarms -->
<route id="file-alarm" errorHandlerRef="mirErrorHandler"
shutdownRoute="Defer" autoStartup="true">
<from uri="activemq:alarm" />
<choice>
<when>
<simple>${headers[ALARM_TARGET]} == "F1"</simple>
<to uri="sql:exec Prc_File1 #, #, #
?dataSourceRef=myDatasource" />
</when>
</choice>
<to uri="bean://alarmEngineBean"/>
</route>
The ActiveMQ configuration is:
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://10.59.133.123:4848" />
</bean>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="10" />
<property name="maximumActive" value="10" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
Thanks for all
Leandro Franchi
On 06-11-2012 21:45, Christian Müller wrote:
Which version do you use?
Hoe does your route looks like?
Sent from a mobile device
Am 06.11.2012 19:29 schrieb "Leandro Franchi" <leandro.fran...@gmail.com>:
Hi,
I've a route to read files, convert the body to string and send to
ActiveMQ. It's ok, but somentimes, I've got these error:
06/11/2012 16:19:19.174 [Camel (mirCamel) thread #21 - file:/tmp] ERROR
org.apache.camel.processor.**FatalFallbackErrorHandler:55 - \--> New
exception on exchangeId: ID-alxredmircol01-51775-**1352225432241-0-14234
org.apache.camel.**TypeConversionException: Error during type conversion
from type: org.apache.camel.component.**file.GenericFile to the required
type: byte[] with value GenericFile[/mnt/sox/HST/**20121030_174713_N.txt]
due java.lang.**IllegalArgumentException: reader must be specified
at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
convertTo(**BaseTypeConverterRegistry.**java:126)
at org.apache.camel.component.**jms.JmsBinding.**
createJmsMessageForType(**JmsBinding.java:534)
at org.apache.camel.component.**jms.JmsBinding.**
createJmsMessage(JmsBinding.**java:464)
at org.apache.camel.component.**jms.JmsBinding.makeJmsMessage(**
JmsBinding.java:285)
at org.apache.camel.component.**jms.JmsProducer$2.**
createMessage(JmsProducer.**java:266)
at org.apache.camel.component.**jms.JmsConfiguration$**
CamelJmsTemplate.**doSendToDestination(**JmsConfiguration.java:214)
at org.apache.camel.component.**jms.JmsConfiguration$**
CamelJmsTemplate.access$100(**JmsConfiguration.java:157)
at org.apache.camel.component.**jms.JmsConfiguration$**
CamelJmsTemplate$3.doInJms(**JmsConfiguration.java:191)
at org.springframework.jms.core.**JmsTemplate.execute(**
JmsTemplate.java:466)
at org.apache.camel.component.**jms.JmsConfiguration$**
CamelJmsTemplate.send(**JmsConfiguration.java:188)
at org.apache.camel.component.**jms.JmsProducer.doSend(**
JmsProducer.java:398)
at org.apache.camel.component.**jms.JmsProducer.processInOnly(**
JmsProducer.java:352)
at org.apache.camel.component.**jms.JmsProducer.process(**
JmsProducer.java:132)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**SendProcessor$2.**doInAsyncProducer(**
SendProcessor.java:120)
at org.apache.camel.impl.**ProducerCache.**doInAsyncProducer(**
ProducerCache.java:292)
at org.apache.camel.processor.**SendProcessor.process(**
SendProcessor.java:115)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
*DelegateAsyncProcessor.java:**99)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.management.**InstrumentationProcessor.**process(**
InstrumentationProcessor.java:**73)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
*DelegateAsyncProcessor.java:**99)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.processor.**interceptor.TraceInterceptor.**
process(TraceInterceptor.java:**91)
at org.apache.camel.processor.**interceptor.**
StreamCachingInterceptor.**process(**StreamCachingInterceptor.java:**52)
at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
RouteContextProcessor.java:45)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.processor.**interceptor.DefaultChannel.**
process(DefaultChannel.java:**303)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:117)
at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:80)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
*DelegateAsyncProcessor.java:**99)
at org.apache.camel.processor.**FatalFallbackErrorHandler.**
processNext(**FatalFallbackErrorHandler.**java:42)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**RedeliveryErrorHandler.**
deliverToFailureProcessor(**RedeliveryErrorHandler.java:**759)
at org.apache.camel.processor.**RedeliveryErrorHandler.**
processErrorHandler(**RedeliveryErrorHandler.java:**273)
at org.apache.camel.processor.**RedeliveryErrorHandler.**process(**
RedeliveryErrorHandler.java:**220)
at org.apache.camel.processor.**interceptor.**
StreamCachingInterceptor.**process(**StreamCachingInterceptor.java:**52)
at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
RouteContextProcessor.java:45)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.processor.**interceptor.DefaultChannel.**
process(DefaultChannel.java:**303)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:117)
at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:80)
at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
RouteContextProcessor.java:45)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.processor.**UnitOfWorkProcessor.**processAsync(**
UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.**UnitOfWorkProcessor.process(**
UnitOfWorkProcessor.java:117)
at org.apache.camel.processor.**RouteInflightRepositoryProcess**
or.processNext(**RouteInflightRepositoryProcess**or.java:48)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.util.**AsyncProcessorHelper.process(**
AsyncProcessorHelper.java:73)
at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
*DelegateAsyncProcessor.java:**99)
at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
DelegateAsyncProcessor.java:**90)
at org.apache.camel.management.**InstrumentationProcessor.**process(**
InstrumentationProcessor.java:**73)
at org.apache.camel.component.**file.GenericFileConsumer.**
processExchange(**GenericFileConsumer.java:336)
at br.com.flexvision.mir.**collector.component.sw.file.**
FileSwitchConsumer.**processExchange(**FileSwitchConsumer.java:26)
at org.apache.camel.component.**file.GenericFileConsumer.**
processBatch(**GenericFileConsumer.java:189)
at org.apache.camel.component.**file.GenericFileConsumer.poll(**
GenericFileConsumer.java:155)
at org.apache.camel.impl.**ScheduledPollConsumer.doRun(**
ScheduledPollConsumer.java:**139)
at org.apache.camel.impl.**ScheduledPollConsumer.run(**
ScheduledPollConsumer.java:91)
at java.util.concurrent.**Executors$RunnableAdapter.**
call(Executors.java:471)
at java.util.concurrent.**FutureTask$Sync.**
innerRunAndReset(FutureTask.**java:351)
at java.util.concurrent.**FutureTask.runAndReset(**
FutureTask.java:178)
at java.util.concurrent.**ScheduledThreadPoolExecutor$**
ScheduledFutureTask.access$**301(**ScheduledThreadPoolExecutor.**java:178)
at java.util.concurrent.**ScheduledThreadPoolExecutor$**
ScheduledFutureTask.run(**ScheduledThreadPoolExecutor.**java:293)
at java.util.concurrent.**ThreadPoolExecutor.runWorker(**
ThreadPoolExecutor.java:1110)
at java.util.concurrent.**ThreadPoolExecutor$Worker.run(**
ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.**java:722)
Caused by: org.apache.camel.**RuntimeCamelException:
java.lang.**IllegalArgumentException:
reader must be specified
at org.apache.camel.util.**ObjectHelper.**wrapRuntimeCamelException(**
ObjectHelper.java:1280)
at org.apache.camel.util.**ObjectHelper.invokeMethod(**
ObjectHelper.java:936)
at org.apache.camel.impl.**converter.**StaticMethodTypeConverter.**
convertTo(**StaticMethodTypeConverter.**java:47)
at org.apache.camel.component.**file.GenericFileConverter.**convertTo(
**GenericFileConverter.java:93)
at sun.reflect.**GeneratedMethodAccessor102.**invoke(Unknown Source)
at sun.reflect.**DelegatingMethodAccessorImpl.**invoke(**
DelegatingMethodAccessorImpl.**java:43)
at java.lang.reflect.Method.**invoke(Method.java:601)
at org.apache.camel.util.**ObjectHelper.invokeMethod(**
ObjectHelper.java:932)
at org.apache.camel.impl.**converter.**StaticMethodFallbackTypeConver*
*ter.convertTo(**StaticMethodFallbackTypeConver**ter.java:50)
at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
doConvertTo(**BaseTypeConverterRegistry.**java:289)
at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
convertTo(**BaseTypeConverterRegistry.**java:111)
... 70 more
Caused by: java.lang.**IllegalArgumentException: reader must be specified
at org.apache.camel.util.**ObjectHelper.notNull(**
ObjectHelper.java:290)
at org.apache.camel.util.**IOHelper.buffered(IOHelper.**java:117)
at org.apache.camel.converter.**IOConverter.toByteArray(**
IOConverter.java:261)
at sun.reflect.**GeneratedMethodAccessor230.**invoke(Unknown Source)
at sun.reflect.**DelegatingMethodAccessorImpl.**invoke(**
DelegatingMethodAccessorImpl.**java:43)
at java.lang.reflect.Method.**invoke(Method.java:601)
at org.apache.camel.util.**ObjectHelper.invokeMethod(**
ObjectHelper.java:932)
... 79 more
Anyone have idea?
Thanks
Leandro Franchi