AW: Help Needed: Splitter and Parallel Processing

2019-08-09 Thread Burkard Stephan
Hi

OK, here is my wild guess :-)


Based on your log output I suspect the following: 

1. It starts and finds 33 instances to process
2. After processing the second instance, there is a problem
3. Could be that the log output "Handled one of: [..]" instead of "Handled one 
of: model.Instance" is an indicator for that.
4. Processing is aborted, perhaps the instanceHandler is swallowing exceptions
5. The next trigger starts, it still finds 33 instances because the already 
processed ones are not removed from the list of unprocessed
6. It tries to process the first, but fails because it is already processed. 
7. Processing is aborted
8. Endless loop between #5 and #7

Cheers
Stephan



-Ursprüngliche Nachricht-
Von: Luiz Eduardo Valmont  
Gesendet: Dienstag, 6. August 2019 06:25
An: users@camel.apache.org
Betreff: Help Needed: Splitter and Parallel Processing

Hi, friends!

According to instructions found on this link (splitter) 
 and this other link (bean) 
, I wrote this (simple?) route:

  *public* *void* configure() *throws* Exception {
> from( "quartz2://instancesProcessor?cron=" + quartz )
>   .routeId( "InstancesProcessor" )
> .process( loadUnprocessedInstancesProcessor )
> .split()
>   .body()
>   .log( LoggingLevel.WARN, "Body type is:
> ${body.getClass().getName()}" )
>   .bean( instanceHandler, "handle" )
>   .log( LoggingLevel.WARN, "Handled one of:
> ${body.getClass().getName()}" )
> .end()
>   .end()
> .end()
> ;
>

And this is the general output:

2019-08-06 04:04:20.281 TRACE fcdc964e5ee8 --- [aultQuartzScheduler-Orthanc
> Overseer_Worker-1] .r.LoadUnprocessedInstancesProcessorImpl : Found 33 
> unprocessed instances
> 2019-08-06 04:04:20.373  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] InstancesProcessor
>   : Body type is: model.Instance
> 2019-08-06 04:04:20.507  INFO fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1]
> b.c.a.r.o.o.h.TagInstanceHandler : Tagged instance [...]
> 2019-08-06 04:04:20.518 DEBUG fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1]
> b.c.a.r.o.o.s.InstanceServiceImpl: Saving Instance [...]
> 2019-08-06 04:04:20.559 DEBUG fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] 
> c.a.r.o.o.h.SaveInstanceHandlerDecorator : Saved instance [...]
> 2019-08-06 04:04:20.608  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] InstancesProcessor
>   : Handled one of: model.Instance
> 2019-08-06 04:04:20.610  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] InstancesProcessor
>   : Body type is: model.Instance
> 2019-08-06 04:04:20.617  INFO fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1]
> b.c.a.r.o.o.h.TagInstanceHandler : Tagged instance [...]
> 2019-08-06 04:04:20.621 DEBUG fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1]
> b.c.a.r.o.o.s.InstanceServiceImpl: Saving Instance [...]
> 2019-08-06 04:04:20.625 DEBUG fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] 
> c.a.r.o.o.h.SaveInstanceHandlerDecorator : Saved instance [...]
> 2019-08-06 04:04:20.750  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] InstancesProcessor
>   : Handled one of: [..]
> 2019-08-06 04:04:20.751  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-1] InstancesProcessor
>   : Body type is: model.Instance
> 2019-08-06 04:04:30.017 TRACE fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-2] 
> .r.LoadUnprocessedInstancesProcessorImpl : Found 33 unprocessed 
> instances
> 2019-08-06 04:04:30.020  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-2] InstancesProcessor
>   : Body type is: model.Instance
> 2019-08-06 04:04:40.020 TRACE fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-3] 
> .r.LoadUnprocessedInstancesProcessorImpl : Found 33 unprocessed 
> instances
> 2019-08-06 04:04:40.027  WARN fcdc964e5ee8 --- 
> [aultQuartzScheduler-Orthanc Overseer_Worker-3] InstancesProcessor
>   : Body type is: model.Instance
>

It basically process 2 instances out of 33 available. And that's it. For the 
following interations, it merely logs a single Body type. Since it executes at 
least once, I reckon that "type-wise" I'm ok, else nothing would work. I 
suppose I'm running out of consumers because of some other route. But it still 
beats me because, as the logging shows, the instanceHandler bean does not hang.

Question: what can be wrong/faulty in my routing setup? I wanted to execute the 
instanceHandler in parallel, like X at a time. But one step at a time.

Any help or guidance or guess are mostly welcome. Even wild guesses.

Thanks in advance!

Luiz Eduardo Guida Valm

AW: InOut exchange pattern on Transactional JMS endpoint not working

2019-08-09 Thread Burkard Stephan
Hi sujin sr

1. Don't worry, Camel is automatically copying the headers for you.

Just send the message on to the replyQueue

.from("activemq:queue:DataRequestQueue")
.to("activemq:queue:DataReplyQueue")

2. Not sure if I understand you correct: you mean how can each sender (if there 
are multiple) consume only its own message replies?

Use message selectors (https://activemq.apache.org/selectors.html). Each sender 
can for example set a header 'sender' with a unique ID. It can then use a 
message selector to listen to the DataReplyQueue, but consume only the messages 
with its own sender-ID.

Cheers
Stephan


-Ursprüngliche Nachricht-
Von: sujin sr  
Gesendet: Mittwoch, 7. August 2019 09:37
An: users@camel.apache.org
Betreff: Re: InOut exchange pattern on Transactional JMS endpoint not working

Thanks Claus Ibsen for the response.

So If I process replyQueue via a separate route I am wondering about the below 
two points
  1. How I can transfer requestMessage headers values to the replyMessage 
headers.
  2. I want to get the replyMessages only send from my producer alone, how to 
listen on specific correlation ids using camel approach.
  This use case will occur when the same request/reply queue can have other 
jms producers and consumers.

Kindly suggest some ideas.


On Wed, 7 Aug 2019 at 12:55, Claus Ibsen  wrote:

> Hi
>
> You cannot do request/reply via reply queues with InOut and with 
> transactions as its a chicken/egg situation.
> The message that is sent to the request queue (for req/reply) is not 
> committed until the transaction is committed and therefore the 
> received cannot "see" the message.
>
> On Wed, Aug 7, 2019 at 9:05 AM sujin sr  wrote:
> >
> > Hi,
> >
> > I have a use case where two jms queues are available those are 
> > DataRequestQueue and DataReplyQueue. Where If I sent a message to 
> > the DataRequestQueue MDB will process the message and send the 
> > response to DataReplyQueue.
> >
> > I have used Camel to send the message and receive the response. I 
> > have
> used
> > InOut Exchange pattern in camel to send and receive the response back.
> >
> > Jms endpoint I am using is Transactional JMS endpoint, transaction
> manager
> > configured in spring bean.
> >
> >
> .to("jms:queue:DataRequestQueue?replyTo=DataReplyQueue&exchangePattern
> =InOut&requestTimeout=60s")
> >
> >
> > When I try to send the message to the request queue using InOut 
> > exchange camel throw ExchangeTimedOutException, but if I remove 
> > transaction
> manager
> > from the spring bean it working fine I able to get the response in 
> > the reply queue.
> > I have also tried to create a separate jms component bean without 
> > transaction manager for InOut exchange alone but it was also not working.
> >
> > Kindly suggest some idea to make the camel route work fine.
> >
> >
> https://access.redhat.com/documentation/en-us/red_hat_jboss_fuse/6.3/h
> tml/transaction_guide/fmrtxnjmssynchronous
> >
> >
> > This article suggested to use a separate queue for request and 
> > response, but I cannot split the route and make the route async.
> >
> > Thanks.
>
>
>
> --
> Claus Ibsen
> -
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>


Re: Persistent MQTT Client Message Lost

2019-08-09 Thread Claus Ibsen
Hi

Ah yeah well spotted Willem.

The endpoint should only call connection() when the 1st consumer is
being added, and then disconnect when the active consumer count hits
zero. So the endpoint needs to keep track on number of consumers
added/removed and react accordingly.

On Fri, Aug 9, 2019 at 8:00 AM Willem Jiang  wrote:
>
> I just checked the code of camel-mqtt code, and find a line[1] which
> could let to the issue that you face.
> If the endpoint is start and consumer is not start yet, the consumers
> could be empty, camel cannot send the exchange to right consumer to
> use. Maybe you can set a break or add some log to verify it.
>
> [1]https://github.com/apache/camel/blob/24521870b81576b5caf9ff3951cff8a0c2c77ab2/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java#L256-L263
>
> Willem Jiang
>
> Twitter: willemjiang
> Weibo: 姜宁willem
>
> On Thu, Aug 8, 2019 at 2:45 PM Michael Zaugg  wrote:
> >
> > Hi Willem
> >
> > It's camel version 2.24.0
> >
> > We gracefully stopped the whole java process with the same result.
> >
> > "Sending PUBLISH to testclient" occurs before "Received SUBSCRIBE"
> > according to Mosquitto's log:
> >
> > 1565245897: New client connected from 172.17.0.1 as testclient (p1, c0,
> > k30).
> > 1565245897: No will message specified.
> > 1565245897: Sending CONNACK to testclient (0, 0)
> > 1565245897: Sending PUBLISH to testclient (d0, q2, r0, m2, 'testtopic',
> > ... (4 bytes))
> > 1565245907: Received SUBSCRIBE from testclient
> > 1565245907: testtopic (QoS 2)
> > 1565245907: testclient 2 testtopic
> > 1565245907: Sending SUBACK to testclient
> > 1565245907: Received PUBREC from testclient (Mid: 2)
> > 1565245907: Sending PUBREL to testclient (m2)
> > 1565245907: Received PUBCOMP from testclient (Mid: 2, RC:0)
> >
> > KR
> > Michael
> >
> > On 08.08.19 03:13, Willem Jiang wrote:
> > > Hi
> > >
> > > Can I know which version of Camel are you using?
> > > I just checked the code of  camel-mqtt, there are some changes to fix
> > > the connection related issues.
> > > I doubt that the connection is not full released in your case.
> > > Can you try to stop the camel application instead of stop the camel route?
> > >
> > >
> > > Willem Jiang
> > >
> > > Twitter: willemjiang
> > > Weibo: 姜宁willem
> > >
> > > On Wed, Aug 7, 2019 at 10:56 PM Michael Zaugg  
> > > wrote:
> > >>
> > >> We're having difficulties with persistent clients (using
> > >> cleanSession=false). We would like to get messages that were sent while
> > >> our client was disconnected.
> > >>
> > >> Steps to reproduce:
> > >> 1. start route to create the initial subscription for the testclient
> > >> from("mqtt:bar?subscribeTopicName=testtopic&cleanSession=false&clientId=testclient&host=tcp://localhost:1883&qualityOfService=ExactlyOnce").transform(body().convertToString()).to("mock:test");
> > >>
> > >> 2. stop route (disconnect from broker)
> > >>
> > >> 3. send message to testtopic with qos 2 using another client id (e.g.
> > >> from cli)
> > >>
> > >> 4. repeat step 1. to connect and re-subscribe to testtopic
> > >>
> > >> expected behaviour:
> > >> message delivered to mock endpoint (mock:test)
> > >>
> > >> actual behaviour:
> > >> publish (delivery) to testclient is shown in broker log (mosquitto) and
> > >> in trace of
> > >> fusesource MQTTEndpoint but message is not delivered to mock endpoint.
> > >> The subscription and hence the callback is not setup at the connect time.
> > >>
> > >> More or less same behaviour when paho client is used instead.
> >
> > --
> > Michael Zaugg
> > Softwareingenieur
> >
> > Puzzle ITC GmbH
> > www.puzzle.ch
> >
> > Telefon +41 31 370 22 00
> > Direkt  +41 31 370 22 15
> > Mobile  +41 79 289 65 88
> > Fax +41 31 370 22 01
> >
> > Werfen Sie einen Blick in unseren Blog:
> > 



-- 
Claus Ibsen
-
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2