Re: How would you implement this simple, but tricky little route?
Hi, For each message that arrives on a queue, I want to kick-off a polling timer to periodically check a specific database row *for that message* This is false assumption, as I don't need (and shouldn't) create new thread for each message. Keep in mind that you poll the same DB resource in each created thread, so instead create single timer route dedicated for checking the DB state. -- Henryk Konsek http://henryk-konsek.blogspot.com
How would you implement this simple, but tricky little route?
For each message that arrives on a queue, I want to kick-off a polling timer to periodically check a specific database row *for that message*, checking for a 'safeToContinue' flag changing value from 'false' to 'true'. Once the corresponding flag in the database for that message (i.e. the message body contains the Primary Key of it's database row) changes to 'true' I want to stop that message's polling job, and progress it's processing to the next stage of my pipeline. Obviously the following Camel code won't work (you can't use a 'timer' component in a 'to()' call), but hopefully it illustrates what I'm trying to achieve: from(direct://messagesWithDatabasePrimaryKeyValueInBody) .to(timer://foo?period=6) .setBody(constant(select safeToContinueFlag from tableX where primaryKey = ) + constant(getIn().getBody())) .to(jdbc://testDB) .choice() .when(body().isEqualTo(true)) .to(KILL THE POLLING TIMER JOB FOR THIS MESSAGE) .to(direct://nextPhaseOfMyPipeline); .otherwise() // Continue polling until database flag changes to 'true'... .end(); How would you implement this simple use-case...!?!? -- View this message in context: http://camel.465427.n5.nabble.com/How-would-you-implement-this-simple-but-tricky-little-route-tp5727651.html Sent from the Camel - Users mailing list archive at Nabble.com.
Re: How would you implement this simple, but tricky little route?
I would not recommend to hold a camel route processing for a longer time to wait for the db flag. So I when the message comes in I would check the DB if the flag is set. If yes I would proceed. If not I would store the message in a DB or filesystem. In another route I would poll the DB for the flag (e.g with camel-jpa). When the DB change comes in I would check if the message is in the store above. If yes I would do the processing. So you have the effect of only processing the message when the flag is set and at the same time no blocked threads. Christian On 15.02.2013 15:34, pmcb55 wrote: For each message that arrives on a queue, I want to kick-off a polling timer to periodically check a specific database row *for that message*, checking for a 'safeToContinue' flag changing value from 'false' to 'true'. Once the corresponding flag in the database for that message (i.e. the message body contains the Primary Key of it's database row) changes to 'true' I want to stop that message's polling job, and progress it's processing to the next stage of my pipeline. Obviously the following Camel code won't work (you can't use a 'timer' component in a 'to()' call), but hopefully it illustrates what I'm trying to achieve: from(direct://messagesWithDatabasePrimaryKeyValueInBody) .to(timer://foo?period=6) .setBody(constant(select safeToContinueFlag from tableX where primaryKey = ) + constant(getIn().getBody())) .to(jdbc://testDB) .choice() .when(body().isEqualTo(true)) .to(KILL THE POLLING TIMER JOB FOR THIS MESSAGE) .to(direct://nextPhaseOfMyPipeline); .otherwise() // Continue polling until database flag changes to 'true'... .end(); How would you implement this simple use-case...!?!? -- View this message in context: http://camel.465427.n5.nabble.com/How-would-you-implement-this-simple-but-tricky-little-route-tp5727651.html Sent from the Camel - Users mailing list archive at Nabble.com. -- Christian Schneider http://www.liquid-reality.de Open Source Architect http://www.talend.com
Re: How would you implement this simple, but tricky little route?
I feel creative today, so I'll suggest the following. AMQ supports Delayed Delivery of messages [1]. JMS 2.0 - which is in Public Review Draft - also introduces this function as a first-class citizen in the spec [2]. By using this approach, rather than blocking a thread waiting for the flag to become true, you could have the barrier part of your route listen on a JMS queue. Upon receiving a message, you query the DB. If the barrier is open, then you continue with the route. Otherwise, you send a message to yourself with a delay of 60 seconds. Beware that this is just an idea. I haven't tested it myself, but I would think that it's a good solution given that: a) Delayed delivery is becoming a standard broker functionality in JMS 2.0, so it's a popular enough use case for a broker. b) It's not using the broker as DB storage - given that messages are truly delivered to the client and then re-dispatched if need be - so messages are really rotating and are not stowed for a long time. c) Satisfies typical QoS requirements: durability if using DeliveryMode=PERSISTENT (tasks survive after a crash), load balancing, failover, etc. [1] http://activemq.apache.org/delay-and-schedule-message-delivery.html [2] http://jcp.org/en/jsr/detail?id=343 Regards, *Raúl Kripalani* Apache Camel Committer Enterprise Architect, Program Manager, Open Source Integration specialist http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani http://blog.raulkr.net | twitter: @raulvk http://twitter.com/raulvk On Fri, Feb 15, 2013 at 2:34 PM, pmcb55 mcbenne...@dnb.com wrote: For each message that arrives on a queue, I want to kick-off a polling timer to periodically check a specific database row *for that message*, checking for a 'safeToContinue' flag changing value from 'false' to 'true'. Once the corresponding flag in the database for that message (i.e. the message body contains the Primary Key of it's database row) changes to 'true' I want to stop that message's polling job, and progress it's processing to the next stage of my pipeline. Obviously the following Camel code won't work (you can't use a 'timer' component in a 'to()' call), but hopefully it illustrates what I'm trying to achieve: from(direct://messagesWithDatabasePrimaryKeyValueInBody) .to(timer://foo?period=6) .setBody(constant(select safeToContinueFlag from tableX where primaryKey = ) + constant(getIn().getBody())) .to(jdbc://testDB) .choice() .when(body().isEqualTo(true)) .to(KILL THE POLLING TIMER JOB FOR THIS MESSAGE) .to(direct://nextPhaseOfMyPipeline); .otherwise() // Continue polling until database flag changes to 'true'... .end(); How would you implement this simple use-case...!?!? -- View this message in context: http://camel.465427.n5.nabble.com/How-would-you-implement-this-simple-but-tricky-little-route-tp5727651.html Sent from the Camel - Users mailing list archive at Nabble.com.