Hi

i have analyzed the problem a little bit and i think the problem is not the InOut pattern, but the multipleConsumers=true flag;

if i run the same sample without the
<setExchangePattern pattern="InOut"/>
everything seems to be fine, but when i look in the in (data/spike/in) folder there are still *.camelLock files for each consumed file....



On 23.05.2012 20:46, Claus Ibsen wrote:
Hi

Thanks. I have reproduced the issue and logged a JIRA
https://issues.apache.org/jira/browse/CAMEL-5303

On Mon, May 21, 2012 at 7:38 AM, Michael Süess
<michael.sue...@basis06.ch>  wrote:
Hi


Camel 2.8.4

Stacktrace:

Starting Camel. Use ctrl + c to terminate the JVM.

[                          main] MainSupport                    INFO Apache
Camel 2.8.4 starting
[                          main] CamelNamespaceHandler          INFO OSGi
environment not detected.
[                          main] SpringCamelContext             INFO Apache
Camel 2.8.4 (CamelContext: camel-1) is starting
[                          main] SpringCamelContext             INFO JMX
enabled. Using ManagedManagementStrategy.
[                          main] AnnotationTypeConverterLoader  INFO Found 3
packages with 15 @Converter classes to load
[                          main] DefaultTypeConverter           INFO Loaded
169 core type converters (total 169 type converters)
[                          main] AnnotationTypeConverterLoader  INFO Loaded
1 @Converter classes
[                          main] DefaultTypeConverter           INFO Loaded
additional 1 type converters (total 170 type converters) in 0.004 seconds
[                          main] FileEndpoint                   INFO
Endpoint is configured with noop=true so forcing endpoint to be idempotent
as well
[                          main] FileEndpoint                   INFO Using
default memory based idempotent repository with cache max size: 1000
[                          main] rFileExclusiveReadLockStrategy WARN
Deleting orphaned lock file: data\spike\in\foo.txt.camelLock
[                          main] SpringCamelContext             INFO Route:
subject started and consuming from: Endpoint[file://data/spike/in?noop=true]
[                          main] SpringCamelContext             INFO Route:
subjectListener1 started and consuming from:
Endpoint[vm://subjectStarted?multipleConsumers=true]
[                          main] SpringCamelContext             INFO Route:
subjectListener2 started and consuming from:
Endpoint[vm://subjectStarted?multipleConsumers=true]
[                          main] SpringCamelContext             INFO Total 3
routes, of which 3 is started.
[                          main] SpringCamelContext             INFO Apache
Camel 2.8.4 (CamelContext: camel-1) started in 0.572 seconds
[read #0 - file://data/spike/in] subject                        INFO
starting file 'foo.txt' ...
[hread #4 - vm://subjectStarted] subjectListener2               INFO
  starting file 'foo.txt' ...
[hread #3 - vm://subjectStarted] subjectListener1               INFO
  starting file 'foo.txt' ...
[hread #4 - vm://subjectStarted] subjectListener2               INFO  file
'foo.txt' done
[hread #3 - vm://subjectStarted] subjectListener1               INFO  file
'foo.txt' done
[read #0 - file://data/spike/in] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: ID-nb19-50158-1337578515058-0-1. Exhausted after
delivery attempt: 1 caught: org.apache.camel.ExchangeTimedOutException: The
OUT message was not received within: 30000 millis. Exchange[foo.txt]
org.apache.camel.ExchangeTimedOutException: The OUT message was not received
within: 30000 millis. Exchange[foo.txt]
        at
org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:129)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:284)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:90)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:306)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:116)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:79)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:139)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:106)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:353)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:176)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:137)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:138)[camel-core-2.8.4.jar:2.8.4]
        at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:90)[camel-core-2.8.4.jar:2.8.4]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_31]
        at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_31]
        at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_31]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_31]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)[:1.6.0_31]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)[:1.6.0_31]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_31]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_31]
        at java.lang.Thread.run(Thread.java:662)[:1.6.0_31]



On 19.05.2012 11:06, Claus Ibsen wrote:

Hi

Post the stacktrace, and what Camel version you are using.



On Wed, May 16, 2012 at 5:35 PM, Michael Süess
<michael.sue...@basis06.ch>    wrote:

I use vm (or seda) with ?multipleConsumers=true to implement a
request-reply
observer pattern:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:camel="http://camel.apache.org/schema/spring";
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd";>
    <camelContext xmlns="http://camel.apache.org/schema/spring";>
        <route id="subject">
            <from uri="file:data/spike/in?noop=true"/>
            <log message="starting file '${header.CamelFileName}' ..."/>
            <to uri="vm:subjectStarted" pattern="InOut"/>
            <log message="file '${header.CamelFileName}' done"/>
        </route>

        <route id="subjectListener1">
            <from uri="vm:subjectStarted?multipleConsumers=true"/>
            <log message="   starting file '${header.CamelFileName}'
..."/>
            <log message="   file '${header.CamelFileName}' done"/>
        </route>

        <route id="subjectListener2">
            <from uri="vm:subjectStarted?multipleConsumers=true"/>
            <log message="   starting file '${header.CamelFileName}'
..."/>
            <log message="   file '${header.CamelFileName}' done"/>
        </route>
    </camelContext>
</beans>

when running this sample i always got an ExchangeTimedOutException
although
all listeners have finished before the timeout.

Any ideas?






--
Michael Sueess                           Senior Software Engineer

basis06 AG, Birkenweg 61, CH-3013 Bern - Fon +41 31 311 32 22
http://www.basis06.ch - source of smart business

Keine News verpassen? http://www.basis06.ch/newsletter-registration.html
Schon gebloggt? http://blog.basis06.ch





--
Michael Sueess                           Senior Software Engineer

basis06 AG, Birkenweg 61, CH-3013 Bern - Fon +41 31 311 32 22
http://www.basis06.ch - source of smart business

Keine News verpassen? http://www.basis06.ch/newsletter-registration.html
Schon gebloggt? http://blog.basis06.ch

Reply via email to