Re: Problem with camel-aws S3 when multiPartUpload is set to true
Try setting region on your AmazonS3Client bean. On Fri, Jan 5, 2018 at 3:19 AM, kretin wrote: > I created a simple camel route to poll for files in a local directory and > upload them to a Ceph (S3) server at my University. I am using apache camel > 2.20.0 with the camel-aws S3 component, when I set multiPartUpload=false > (the default) in the uri, everything works fine, but if I change to > multiPartUpload=true, it fails. > > I know there is nothing wrong with my s3 secret or s3 access key because > when I set multiPartUpload=false, everything works (there are no crazy plus > (+) characters that need to be escaped in the keys). > > Here is the stack trace: > > com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon > S3; Status Code: 403; Error Code: SignatureDoesNotMatch; Request ID: > tx002e9edee-005a4ed3d2-2213a2-uky-campus-1; S3 Extended > Request ID: 2213a2-uky-campus-1-uky) > at com.amazonaws.http.AmazonHttpClient$RequestExecutor. > handleErrorResponse(AmazonHttpClient.java:1592) > ~[aws-java-sdk-core-1.11.186.jar:?] > at com.amazonaws.http.AmazonHttpClient$RequestExecutor. > executeOneRequest(AmazonHttpClient.java:1257) > ~[aws-java-sdk-core-1.11.186.jar:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029) > ~[aws-java-sdk-core-1.11.186.jar:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741) > ~[aws-java-sdk-core-1.11.186.jar:?] > at com.amazonaws.http.AmazonHttpClient$RequestExecutor. > executeWithTimer(AmazonHttpClient.java:715) ~[aws-java-sdk-core-1.11.186. > jar:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697) > ~[aws-java-sdk-core-1.11.186.jar:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665) > ~[aws-java-sdk-core-1.11.186.jar:?] > at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl. > execute(AmazonHttpClient.java:647) ~[aws-java-sdk-core-1.11.186.jar:?] > at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) > ~[aws-java-sdk-core-1.11.186.jar:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4227) > ~[aws-java-sdk-s3-1.11.186.jar:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4174) > ~[aws-java-sdk-s3-1.11.186.jar:?] > at > com.amazonaws.services.s3.AmazonS3Client.abortMultipartUpload(AmazonS3Client.java:2928) > ~[aws-java-sdk-s3-1.11.186.jar:?] > at org.apache.camel.component.aws.s3.S3Producer. > processMultiPart(S3Producer.java:181) ~[camel-aws-2.20.0.jar:2.20.0] > at > org.apache.camel.component.aws.s3.S3Producer.process(S3Producer.java:84) > ~[camel-aws-2.20.0.jar:2.20.0] > at ... > > My camel-context.xml looks like: > > > http://www.springframework.org/schema/beans"; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > xsi:schemaLocation=" >http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd >http://camel.apache.org/schema/spring http://camel.apache.org/ > schema/spring/camel-spring.xsd"> > > class="org.apache.camel.component.properties.PropertiesComponent"> > > > > class="org.apache.camel.spring.spi.BridgePropertyPlaceholderConfi > gurer"> > value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> > > > > > > > > > > > > > > > > > > > http://camel.apache.org/schema/spring";> > > > > > > > testMultiPart/${in.header.CamelFileName} > > > > > ${in.header.CamelFileLength} > > > > > > uri="aws-s3://{{s3Bucket}}?amazonS3Client=#amazonClient&multiPartUpload=true" > /> > > > > > > > > Again, everything works fine if I set multiPartUpload=false in the above > camel-context.xml > > I have tried a lot of things like: > > • setting the CamelAwsS3ContentMD5 header to the MD5 hash of the > file (which doesn't make sense for multi-part files) > • various settings for the partSize parameter > • different sized files from very large to very small > • setting the system parameter: System.setProperty("com. > amazonaws.services.s3.disablePutObjectMD5Validation", "true"); > > If I turn on trace debugging for camel, it doesnt help much: > > [d #2 - file://target/sendToS3/] S3Producer TRACE > Initiating multipart upload [com.amazonaws.services.s3.model. > InitiateMultipartUploadRequest@3731147a] from exchange > [Exchange[ID-Toucan-local-1515115111374-0-1]]... > [d #2 - file://targe
Re: v2.20.1 bug or a feature?
Hi, Yes, you're right. The body is under the 'Out' message on the exchange when the template.request returns. I guess, what confused me is that template.requestBody() behaves in a different way. Thanks for your explanation. Best, Artur On Thu, Dec 28, 2017 at 8:06 AM, Siano, Stephan wrote: > Hi, > > this is how the template works in combination with different endpoints. > Camel exchanges can have an in message and an out message. If the camel > pipeline proceeds to the next step the out message is moved to the in > message before that endpoint is called in case there is an out message. The > endpoint itself can either write the out message or overwrite the in > message. The bean processors obviously write an out message, so if you are > calling that first pipeline, you end up with your response in the out > message and the template does not move it to the in message. Replacing the > getIn() with getOut() would give you the expected response for the first > route. The empty processor adds another endpoint to the route, so the > result of your bean is moved to the in message (and your dummy processor > just leaves it alone). > > This behavior is kind of weird, but you will only notice it when you are > calling direct endpoints from a template, which is probably mostly > happening in unit tests. > > Best regards > Stephan > > -Original Message- > From: Artur Jablonski [mailto:ajablon...@ravenpack.com] > Sent: Mittwoch, 27. Dezember 2017 16:32 > To: users@camel.apache.org > Subject: v2.20.1 bug or a feature? > > Hey, > > I bumped into this one today and not sure if this is a bug or a feature :). > > The example is in SpringBoot application, if that makes any difference > > Given this route: > > from("direct:bugOrFeature") > .to("bean:mybean?method=transform"); > > and the bean method: > > public String transform(@Body String in) > { > return "I want to change the body!"; > } > > if I call it like this: > > String result = > template.request("direct:bugOrFeature", > e -> e.getIn().setBody("Some input")) > .getIn() > .getBody(String.class); > > logger.info("result: {}", result); > > Then it prints out the old value: > > result: Some input > > However if I stick a dummy processor at the end of the route > > from("direct:bugOrFeature") > .to("bean:camelMaintenanceRoutes?method=transform") > .process(e -> {}); > > then suddenly the same call gives me the new value > > result: I want to change the body! > > Is this a bug or am I missing something? > > Note that if I request for a body like this: > > template > .requestBody("direct:bugOrFeature", "some input", String.class) > > then it get's the 'new' value without the dummy processor. > The thing is that I need to get out some headers from response, so I need > Exchange object and not just the body. > > Best, > Artur >
v2.20.1 bug or a feature?
Hey, I bumped into this one today and not sure if this is a bug or a feature :). The example is in SpringBoot application, if that makes any difference Given this route: from("direct:bugOrFeature") .to("bean:mybean?method=transform"); and the bean method: public String transform(@Body String in) { return "I want to change the body!"; } if I call it like this: String result = template.request("direct:bugOrFeature", e -> e.getIn().setBody("Some input")) .getIn() .getBody(String.class); logger.info("result: {}", result); Then it prints out the old value: > result: Some input However if I stick a dummy processor at the end of the route from("direct:bugOrFeature") .to("bean:camelMaintenanceRoutes?method=transform") .process(e -> {}); then suddenly the same call gives me the new value > result: I want to change the body! Is this a bug or am I missing something? Note that if I request for a body like this: template .requestBody("direct:bugOrFeature", "some input", String.class) then it get's the 'new' value without the dummy processor. The thing is that I need to get out some headers from response, so I need Exchange object and not just the body. Best, Artur
Re: Using Seda component for
Perhaps someone more knowledgeable can comment, but the documentation for seda component has a section on request-reply ( http://camel.apache.org/seda.html) The same document warns about using seda with threads as in your example as you will end up with two blocking queues (one because of seda and one because of threadpool) Best, Artur On Tue, Dec 26, 2017 at 3:21 PM, Mohammed Hany wrote: > Hi, > > Can I use the Seda endpoint in a synchronous request-reply pattern? It is > described here http://camel.apache.org/async.html. > > My route is simply: > from (mina2) > transform > to (seda:ws) > > // Bottleneck route as it may have delay > from(seda:ws) > .threads(5000) // Blocking queue due to the delay imposed by > cxf:bean:x1 > to(cxf:bean:x1) > > > This above routes will handle huge amount of requests. This is why I want > to move from the direct: endpoint to seda: however, I need to confirm the > following: > 1- Using Seda I can do synchronous request-reply > 2- Seda will benefit from the threadPoolProfile or threads as mentioned > above. > > Regards, > Hany >
Reactive streams with Camel, knowing when the flow completes (normally or with error)
Hello, I was following the Camel In Action 2 (version 14) chapter on Reactive streams and Camel (chapter 21). I followed section 21.1 about bridging Camel routes and JavaRx reactive streams, did some experimenting and there's one thing that I scratch my head over. Namely, is there a way to wait/be notified when camel routes turned into reactive stream complete (normally or with an exception). So if I have these routes: from("direct:in1") .routeId("1") .to("reactive-streams:out1") .log("1 done"); from("reactive-streams:in2") .routeId("2") .to("reactive-streams:out2") .log("2 done"); from("reactive-streams:in3") .routeId("3") .log("3 done"); and then if I glue them using JavaRx and CamelReactiveStreamService: Flowable.fromPublisher(rxCamel.fromStream("out1")) .subscribe(rxCamel.streamSubscriber("in2")); Flowable.fromPublisher(rxCamel.fromStream("out2")) .subscribe(rxCamel.streamSubscriber("in3")); and now if I call the route "1" using ProducerTemplate: template.requestBody("direct:in1", "payload"); logger.info("requestBody() returned"); the output is: *1 done* *requestBody() returned* *2 done* *3 done* So the requestBody() returns after the route "1" passes the data to "reactive-streams:out1". What if I want to wait/be notified that the exchange reached the end of the reactive stream and all went good or if somewhere on the way there has been an error? Is that possible? Where do I look? Disclaimer: I am new to reactive streams. Best, Artur PS: Thanks Claus and Jonathan for the book. It's awesome!
Re: Camel [2.19.3] + SpringBoot [1.5.6] fails executing two JUnit tests that succeed when run individually
Hey, Nope, I tried with 2.19.4 and it was failing in the same way as 2.19.3. We felt brave yesterday and moved to 2.20.1. What's the worst thing that can happen? :) On Thu, Nov 16, 2017 at 3:42 PM, Claus Ibsen wrote: > Hi > > If you want to stay on 2.19.x you may try 2.19.4 which could > potentially also have the fix. > > On Wed, Nov 15, 2017 at 4:39 PM, Artur Jablonski > wrote: > > Sweet lord! This must be the fastest response ever... and yes indeed it > > works with 2.20.1. > > > > Thank you Claus > > > > Best, > > Artur > > > > > > On Wed, Nov 15, 2017 at 4:09 PM, Claus Ibsen > wrote: > > > >> Hi > >> > >> I think we fixed something related to that, try with camel 2.20.1. > >> Or refactor your test classes to only have 1 test method when advicing > >> > >> On Wed, Nov 15, 2017 at 4:06 PM, Artur Jablonski > >> wrote: > >> > Hello, > >> > > >> > I've been looking at this issue with running integration tests in > Camel + > >> > SpringBoot combo context. > >> > > >> > I have a route that fails to start in integration tests when > CamelContext > >> > starts up. That's expected. One of endpoints refers to a AWS resource > >> that > >> > the test running machine has no access to. That's OK too, I don't > want to > >> > poke the real resource in the test, so I use the adviceWith() to > replace > >> > the endpoint and start the camel context myself. This works well for a > >> > single test. As soon as I add another test, the second test fails, > >> because > >> > of the complaints from the original endpoint about the permissions. It > >> > looks as if before the second test runs the spring context starts up > >> > together with the camel context (it seems to ignore the fact that the > >> test > >> > class is annotated with @UseAdviceWith). > >> > > >> > This is a minimal testcase I could produce to reproduce the behaviour > >> > https://gist.github.com/anonymous/626cf26197dd5a05569d4d306870a22e > >> > > >> > What am I missing? > >> > > >> > Best, > >> > Artur > >> > >> > >> > >> -- > >> Claus Ibsen > >> - > >> http://davsclaus.com @davsclaus > >> Camel in Action 2: https://www.manning.com/ibsen2 > >> > > > > -- > Claus Ibsen > - > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 >
Re: Camel [2.19.3] + SpringBoot [1.5.6] fails executing two JUnit tests that succeed when run individually
Sweet lord! This must be the fastest response ever... and yes indeed it works with 2.20.1. Thank you Claus Best, Artur On Wed, Nov 15, 2017 at 4:09 PM, Claus Ibsen wrote: > Hi > > I think we fixed something related to that, try with camel 2.20.1. > Or refactor your test classes to only have 1 test method when advicing > > On Wed, Nov 15, 2017 at 4:06 PM, Artur Jablonski > wrote: > > Hello, > > > > I've been looking at this issue with running integration tests in Camel + > > SpringBoot combo context. > > > > I have a route that fails to start in integration tests when CamelContext > > starts up. That's expected. One of endpoints refers to a AWS resource > that > > the test running machine has no access to. That's OK too, I don't want to > > poke the real resource in the test, so I use the adviceWith() to replace > > the endpoint and start the camel context myself. This works well for a > > single test. As soon as I add another test, the second test fails, > because > > of the complaints from the original endpoint about the permissions. It > > looks as if before the second test runs the spring context starts up > > together with the camel context (it seems to ignore the fact that the > test > > class is annotated with @UseAdviceWith). > > > > This is a minimal testcase I could produce to reproduce the behaviour > > https://gist.github.com/anonymous/626cf26197dd5a05569d4d306870a22e > > > > What am I missing? > > > > Best, > > Artur > > > > -- > Claus Ibsen > - > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 >
Camel [2.19.3] + SpringBoot [1.5.6] fails executing two JUnit tests that succeed when run individually
Hello, I've been looking at this issue with running integration tests in Camel + SpringBoot combo context. I have a route that fails to start in integration tests when CamelContext starts up. That's expected. One of endpoints refers to a AWS resource that the test running machine has no access to. That's OK too, I don't want to poke the real resource in the test, so I use the adviceWith() to replace the endpoint and start the camel context myself. This works well for a single test. As soon as I add another test, the second test fails, because of the complaints from the original endpoint about the permissions. It looks as if before the second test runs the spring context starts up together with the camel context (it seems to ignore the fact that the test class is annotated with @UseAdviceWith). This is a minimal testcase I could produce to reproduce the behaviour https://gist.github.com/anonymous/626cf26197dd5a05569d4d306870a22e What am I missing? Best, Artur
Re: aws-s3, get a single object
Just for the completeness, Here's the lastest version of my custom PollingConsumerStrategy https://gist.github.com/anonymous/17465c5ff78d9b75726563dc7727d5ae It allows me to detect the specific error from Amazon and pass null down the route, or propagate the error in any other case. Is that a sane approach or is it a better way, cleaner, nicer, way more awesome way. Thanks Artur On Tue, Jul 4, 2017 at 5:14 PM, Artur Jablonski wrote: > Right, > So I managed to get that working, though I am not sure if that's the > proper way of doing this or more of a hack. > > So again, I want to use pollEnrich() to fetch a specific file from S3. If > a file doesn't exist then I want to pass a null message down the route. To > simplify things let's say that for ANY error that happens during poll I > want to retry 3 times (with some backoff between) and then give up and pass > null. I DON"T want to use the timeout, as it seems brittle as I explained > in my previous email > > The problem with that is that it seems that by default if there's an error > while polling, pollEnrich() will try again and again indefinitely. Bummer. > > Taariq suggested using a custom pollStrategy to achieve correct handling > of that scenario. PollingConsumerPollStrategy has 3 methods, of which the > "boolean rollback(Consumer, Endpoint, int, Exception)" is of interest here > as it's the one called when en error happen during poll. The method is > supposed to return 'true' if you want to retry or 'false' otherwise. But > here's the confusing thing. The DefaultPollingConsumerPollStrategy that's > used if you don't define one always returns false and yet the polling > doesn't stop when error happens. > > Fine, so after digging into the code here's what I discovered: > > pollEnrich() in Java DSL will create PollEnricher in the route which among > other types, implements Processor. PollEnricher in its process() method > resolves a Consumer to which it will delegate using one of > receive()/receiveNoWait()/receive(timeout) methods depending whether a > timeout has been set or not. In my case I am not setting timeout so the > method it will delegate to is Consumer.receive(). > > At runtime that Consumer is an instance of EventDrivenPollingConsumer that > in turn delegates to the S3Consumer (which is an implementation of > ScheduledBatchPollingConsumer). The receive() method of the > EventDrivenPollingConsumer implementation will BLOCK until there's an > Exchange on it's internal queue. A place where an Exchange is put on that > queue is on the Consumer's process() method, because > EventDrivenPollingConsumer is itself a Processor. > > So here what happens if polling yields some result (omitting arguments of > the methods for brevity). > > PollEnricher.process() > EventDrivenPollingConsumer.start() > EventDrivenPollingConsumer.receive() > ScheduledPollConsumer,doStart() //this will effectively schedule the > polling on the S3Consumer in a separate thread > BlockingQueue.take() //this will block this thread until there's an > Exchange on the queue > > > So now the question is how does that polled exchange ending up on > EventDrivenPollingConsumer's queue so that it can unblock and carry on. > > ScheduledPollConsumer.run() //main entry to the polling task > S3Consumer.poll() > EventDrivenPollingConsumer.process() > BlockingQueue.put() //this effectively unblocks the > PollEnricher.process() and the processing can carry on > > > So there you have it, it's the EventDrivenPollingConsumer that is creating > the S3Consumer in this case and it passes itself as a Procesor to the > constructor, so S3Consumer can invoke the process() method. > > > Ok, so what happens in my usecase? it's the S3Consumer.poll() that throws > en exception from AWS service, so EventDrivenPollingConsumer.process() is > not called. The scheduled polling task will run over and over again. The > PollRnricher.process() blocked forever. > > So back to the PollingConsumerPollStrategy and it's rollback() method. > Like I said, even if I return false from it, the polling will continue, > because the polling task is never stopped. So what I ended up with was this: > > https://gist.github.com/anonymous/195f2b6c835db82decf1649969a717fe > > So basically after 3 attempts I am creating an Exchange with null Body and > run the process() method on it from the rollback() method so that the route > can continue, I return false and all looks good. > > Please note the dodgy Thread.sleep() which I put there for the backoff > between retires, because all the backoff* configurations from
Re: aws-s3, get a single object
Right, So I managed to get that working, though I am not sure if that's the proper way of doing this or more of a hack. So again, I want to use pollEnrich() to fetch a specific file from S3. If a file doesn't exist then I want to pass a null message down the route. To simplify things let's say that for ANY error that happens during poll I want to retry 3 times (with some backoff between) and then give up and pass null. I DON"T want to use the timeout, as it seems brittle as I explained in my previous email The problem with that is that it seems that by default if there's an error while polling, pollEnrich() will try again and again indefinitely. Bummer. Taariq suggested using a custom pollStrategy to achieve correct handling of that scenario. PollingConsumerPollStrategy has 3 methods, of which the "boolean rollback(Consumer, Endpoint, int, Exception)" is of interest here as it's the one called when en error happen during poll. The method is supposed to return 'true' if you want to retry or 'false' otherwise. But here's the confusing thing. The DefaultPollingConsumerPollStrategy that's used if you don't define one always returns false and yet the polling doesn't stop when error happens. Fine, so after digging into the code here's what I discovered: pollEnrich() in Java DSL will create PollEnricher in the route which among other types, implements Processor. PollEnricher in its process() method resolves a Consumer to which it will delegate using one of receive()/receiveNoWait()/receive(timeout) methods depending whether a timeout has been set or not. In my case I am not setting timeout so the method it will delegate to is Consumer.receive(). At runtime that Consumer is an instance of EventDrivenPollingConsumer that in turn delegates to the S3Consumer (which is an implementation of ScheduledBatchPollingConsumer). The receive() method of the EventDrivenPollingConsumer implementation will BLOCK until there's an Exchange on it's internal queue. A place where an Exchange is put on that queue is on the Consumer's process() method, because EventDrivenPollingConsumer is itself a Processor. So here what happens if polling yields some result (omitting arguments of the methods for brevity). PollEnricher.process() EventDrivenPollingConsumer.start() EventDrivenPollingConsumer.receive() ScheduledPollConsumer,doStart() //this will effectively schedule the polling on the S3Consumer in a separate thread BlockingQueue.take() //this will block this thread until there's an Exchange on the queue So now the question is how does that polled exchange ending up on EventDrivenPollingConsumer's queue so that it can unblock and carry on. ScheduledPollConsumer.run() //main entry to the polling task S3Consumer.poll() EventDrivenPollingConsumer.process() BlockingQueue.put() //this effectively unblocks the PollEnricher.process() and the processing can carry on So there you have it, it's the EventDrivenPollingConsumer that is creating the S3Consumer in this case and it passes itself as a Procesor to the constructor, so S3Consumer can invoke the process() method. Ok, so what happens in my usecase? it's the S3Consumer.poll() that throws en exception from AWS service, so EventDrivenPollingConsumer.process() is not called. The scheduled polling task will run over and over again. The PollRnricher.process() blocked forever. So back to the PollingConsumerPollStrategy and it's rollback() method. Like I said, even if I return false from it, the polling will continue, because the polling task is never stopped. So what I ended up with was this: https://gist.github.com/anonymous/195f2b6c835db82decf1649969a717fe So basically after 3 attempts I am creating an Exchange with null Body and run the process() method on it from the rollback() method so that the route can continue, I return false and all looks good. Please note the dodgy Thread.sleep() which I put there for the backoff between retires, because all the backoff* configurations from ScheduledPollConsumer are working BETWEEN polls and not within the same poll which is what is happening if I return 'true' from the rollback() method. Is there a better way of doing this? This seems like a very common use case to me, perhaps this should be considered as a configuration of pollEnrich() or some other involved components here. What do I win for the longest email on the group? :p Best, Artur On Thu, Jun 22, 2017 at 5:36 PM, Artur Jablonski wrote: > I will try that, thank you for the hint! > > On Wed, Jun 21, 2017 at 5:40 PM, Taariq Levack wrote: > >> Then you can try the custom pollStrategy. >> >> >> On 21 Jun 2017 10:57, "Artur Jablonski" wrote: >> >> > Hello! >> > >> > Thanks for the hint >> > >> > So that works...
Re: aws-s3, get a single object
I will try that, thank you for the hint! On Wed, Jun 21, 2017 at 5:40 PM, Taariq Levack wrote: > Then you can try the custom pollStrategy. > > > On 21 Jun 2017 10:57, "Artur Jablonski" wrote: > > > Hello! > > > > Thanks for the hint > > > > So that works... kind of. > > > > First, with timeout(0) it doesn't work, it won't even try to poll for > > anything. I suppose that's how it's supposed to work. Fine. > > > > With timeout(>0) it works, as in it gives up after the given number of > > milliseconds and null is propagated down the route where I can > > catch it in choice() and only process further those not null responses. > > > > This seems to me a bit brittle. I mean setting this timeout too low, > could > > potentially make me miss an existing file if some network glitch happened > > that would slow down the request-response cycle, and setting it high > seems > > like a waste when the AWS-S3 Consumer throws an exception that clearly > > tells me that file that I want doesn't exist. > > > > So what I am looking for, I guess, is exception handling mechanism for > > Consumer, so that I can break polling on given exception. > > > > I looked here http://camel.apache.org/polling-consumer.html > > and was looking at backoffErrorThreshold, backoffIdleThreshold and > > backoffMultiplier as the only properties that mention error handling, > but I > > guess they're not for what I want. > > > > Am i missing something? > > > > Cheers > > Artur > > > > > > On Wed, Jun 21, 2017 at 6:26 AM, Taariq Levack > wrote: > > > > > You can use a timeout of 0 for receiving with no wait, or greater than > 0 > > > for waiting a bit. > > > Default is -1 > > > > > > http://camel.apache.org/content-enricher.html > > > > > > Cheers, > > > Taariq > > > > > > On 20 Jun 2017 22:12, "Artur Jablonski" > > wrote: > > > > > > > Right, that almost works. > > > > > > > > Now the problem I have is that when a file I ask for doesn't exist > the > > > > consumer seems to be polling for it indefinitely. Is there a way of > > > saying: > > > > hey polling consumer, try n times and then give up. Throw an error or > > > > return null. Don't insist, please. > > > > > > > > Cheers > > > > Artur > > > > > > > > On 20 Jun 2017 9:55 a.m., "Artur Jablonski" < > ajablon...@ravenpack.com> > > > > wrote: > > > > > > > > > Thank you so much Gregor! > > > > > > > > > > You pushed me in the right direction. I initially wrote a patch for > > S3 > > > > > producer to get me what I wanted, but after seeing your reply I > read > > > > about > > > > > the content enricher pattern and pollEnrich() in Camel. > > > > > > > > > > This SO was helpful as well > > > > > https://stackoverflow.com/questions/36948005/how-do- > > > > > dynamic-from-endpoints-and-exchanges-work-in-camel > > > > > > > > > > In the end, with this route: > > > > > > > > > > from("direct:pollEnrich").routeId("pollEnrich") > > > > > .pollEnrich() > > > > > .simple("aws-s3://{{buckets. > > > > mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false& > > > > fileName=${in.header.CamelAwsS3Key}") > > > > > .convertBodyTo(byte[].class) > > > > > .to("bean:unmarshall") > > > > > > > > > > > > > > > I can now poke the route and set the CamelAWSS3Key header to the > > file I > > > > > want. > > > > > > > > > > Cheerio! > > > > > Artur > > > > > > > > > > > > > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski < > > > > > gre...@list.zurowski.org> wrote: > > > > > > > > > >> Hi Artur, > > > > >> > > > > >> You should be able to get a single S3 object with the camel-aws > > > > >> component using the "fileName" query parameter on the consumer. > See > > > > >> the documentation at > > > > >> https://github.com/apache/camel/blob/master/components/camel > > > > >> -aws/src/main/docs/aws-s3-component.adoc. > > > > >> > > > > >> Gregor > > > > >> > > > > >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski > > > > >> wrote: > > > > >> > Hello, > > > > >> > > > > > >> > I am trying to get a single object from S3 via Camel and I am > not > > > sure > > > > >> how > > > > >> > to do this. > > > > >> > > > > > >> > It seems that the producer endpoint can only upload to S3 and > the > > > > >> consumer > > > > >> > endpoint can poll a bucket, but how can express a use case of > > > > >> retrieving a > > > > >> > single S3 object when I know it's key? > > > > >> > > > > > >> > Best > > > > >> > Artur > > > > >> > > > > > > > > > > > > > > > > > > > >
Re: aws-s3, get a single object
Hello! Thanks for the hint So that works... kind of. First, with timeout(0) it doesn't work, it won't even try to poll for anything. I suppose that's how it's supposed to work. Fine. With timeout(>0) it works, as in it gives up after the given number of milliseconds and null is propagated down the route where I can catch it in choice() and only process further those not null responses. This seems to me a bit brittle. I mean setting this timeout too low, could potentially make me miss an existing file if some network glitch happened that would slow down the request-response cycle, and setting it high seems like a waste when the AWS-S3 Consumer throws an exception that clearly tells me that file that I want doesn't exist. So what I am looking for, I guess, is exception handling mechanism for Consumer, so that I can break polling on given exception. I looked here http://camel.apache.org/polling-consumer.html and was looking at backoffErrorThreshold, backoffIdleThreshold and backoffMultiplier as the only properties that mention error handling, but I guess they're not for what I want. Am i missing something? Cheers Artur On Wed, Jun 21, 2017 at 6:26 AM, Taariq Levack wrote: > You can use a timeout of 0 for receiving with no wait, or greater than 0 > for waiting a bit. > Default is -1 > > http://camel.apache.org/content-enricher.html > > Cheers, > Taariq > > On 20 Jun 2017 22:12, "Artur Jablonski" wrote: > > > Right, that almost works. > > > > Now the problem I have is that when a file I ask for doesn't exist the > > consumer seems to be polling for it indefinitely. Is there a way of > saying: > > hey polling consumer, try n times and then give up. Throw an error or > > return null. Don't insist, please. > > > > Cheers > > Artur > > > > On 20 Jun 2017 9:55 a.m., "Artur Jablonski" > > wrote: > > > > > Thank you so much Gregor! > > > > > > You pushed me in the right direction. I initially wrote a patch for S3 > > > producer to get me what I wanted, but after seeing your reply I read > > about > > > the content enricher pattern and pollEnrich() in Camel. > > > > > > This SO was helpful as well > > > https://stackoverflow.com/questions/36948005/how-do- > > > dynamic-from-endpoints-and-exchanges-work-in-camel > > > > > > In the end, with this route: > > > > > > from("direct:pollEnrich").routeId("pollEnrich") > > > .pollEnrich() > > > .simple("aws-s3://{{buckets. > > mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false& > > fileName=${in.header.CamelAwsS3Key}") > > > .convertBodyTo(byte[].class) > > > .to("bean:unmarshall") > > > > > > > > > I can now poke the route and set the CamelAWSS3Key header to the file I > > > want. > > > > > > Cheerio! > > > Artur > > > > > > > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski < > > > gre...@list.zurowski.org> wrote: > > > > > >> Hi Artur, > > >> > > >> You should be able to get a single S3 object with the camel-aws > > >> component using the "fileName" query parameter on the consumer. See > > >> the documentation at > > >> https://github.com/apache/camel/blob/master/components/camel > > >> -aws/src/main/docs/aws-s3-component.adoc. > > >> > > >> Gregor > > >> > > >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski > > >> wrote: > > >> > Hello, > > >> > > > >> > I am trying to get a single object from S3 via Camel and I am not > sure > > >> how > > >> > to do this. > > >> > > > >> > It seems that the producer endpoint can only upload to S3 and the > > >> consumer > > >> > endpoint can poll a bucket, but how can express a use case of > > >> retrieving a > > >> > single S3 object when I know it's key? > > >> > > > >> > Best > > >> > Artur > > >> > > > > > > > > >
Re: aws-s3, get a single object
Right, that almost works. Now the problem I have is that when a file I ask for doesn't exist the consumer seems to be polling for it indefinitely. Is there a way of saying: hey polling consumer, try n times and then give up. Throw an error or return null. Don't insist, please. Cheers Artur On 20 Jun 2017 9:55 a.m., "Artur Jablonski" wrote: > Thank you so much Gregor! > > You pushed me in the right direction. I initially wrote a patch for S3 > producer to get me what I wanted, but after seeing your reply I read about > the content enricher pattern and pollEnrich() in Camel. > > This SO was helpful as well > https://stackoverflow.com/questions/36948005/how-do- > dynamic-from-endpoints-and-exchanges-work-in-camel > > In the end, with this route: > > from("direct:pollEnrich").routeId("pollEnrich") > .pollEnrich() > > .simple("aws-s3://{{buckets.mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false&fileName=${in.header.CamelAwsS3Key}") > .convertBodyTo(byte[].class) > .to("bean:unmarshall") > > > I can now poke the route and set the CamelAWSS3Key header to the file I > want. > > Cheerio! > Artur > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski < > gre...@list.zurowski.org> wrote: > >> Hi Artur, >> >> You should be able to get a single S3 object with the camel-aws >> component using the "fileName" query parameter on the consumer. See >> the documentation at >> https://github.com/apache/camel/blob/master/components/camel >> -aws/src/main/docs/aws-s3-component.adoc. >> >> Gregor >> >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski >> wrote: >> > Hello, >> > >> > I am trying to get a single object from S3 via Camel and I am not sure >> how >> > to do this. >> > >> > It seems that the producer endpoint can only upload to S3 and the >> consumer >> > endpoint can poll a bucket, but how can express a use case of >> retrieving a >> > single S3 object when I know it's key? >> > >> > Best >> > Artur >> > >
Re: aws-s3, get a single object
Thank you so much Gregor! You pushed me in the right direction. I initially wrote a patch for S3 producer to get me what I wanted, but after seeing your reply I read about the content enricher pattern and pollEnrich() in Camel. This SO was helpful as well https://stackoverflow.com/questions/36948005/how-do-dynamic-from-endpoints-and-exchanges-work-in-camel In the end, with this route: from("direct:pollEnrich").routeId("pollEnrich") .pollEnrich() .simple("aws-s3://{{buckets.mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false&fileName=${in.header.CamelAwsS3Key}") .convertBodyTo(byte[].class) .to("bean:unmarshall") I can now poke the route and set the CamelAWSS3Key header to the file I want. Cheerio! Artur On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski wrote: > Hi Artur, > > You should be able to get a single S3 object with the camel-aws > component using the "fileName" query parameter on the consumer. See > the documentation at > https://github.com/apache/camel/blob/master/components/ > camel-aws/src/main/docs/aws-s3-component.adoc. > > Gregor > > On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski > wrote: > > Hello, > > > > I am trying to get a single object from S3 via Camel and I am not sure > how > > to do this. > > > > It seems that the producer endpoint can only upload to S3 and the > consumer > > endpoint can poll a bucket, but how can express a use case of retrieving > a > > single S3 object when I know it's key? > > > > Best > > Artur >
aws-s3, get a single object
Hello, I am trying to get a single object from S3 via Camel and I am not sure how to do this. It seems that the producer endpoint can only upload to S3 and the consumer endpoint can poll a bucket, but how can express a use case of retrieving a single S3 object when I know it's key? Best Artur
Re: Camel + Spring + Spring Boot, waiting for the routes to start
Hello Claus Right, so 1. I cannot send a message without blocking because the event calback seems to be triggered BEFORE the CamelContext is fully up and running. That's kind of counter intuitive. I would imagine that when Spring's ApplicationContext has been fully initialized all the beans are fully initialized and ready to go... including CamelContext and all the routes 2. I cannot block in that method not to deadlock the whole Spring machinery. Fair enough. So I need to spawn a new thread that I assume needs to poke the CamelContext about whether it's state is 'started'ok, can do that, but perhaps it's not the most elegant solution. Any reason why Camel cannot be fully up and running before Spring? Cheerio Artur On Tue, Apr 11, 2017 at 3:34 PM, Claus Ibsen wrote: > Dont use the event listener to do code that may block > @EventListener(ContextRefreshedEvent.class) > > You are basically hi-jacking the spring thread that signals this > event. You can create a new thread to send the message from the event > listener so you dont block it. > > > > > On Tue, Apr 11, 2017 at 1:30 PM, Artur Jablonski > wrote: > > Hello, > > > > I have the Camel + Spring + SpringBoot combo working > > > > > > I have my RouteBilder Spring Component that's being pickup up and started > > during Spring startup. > > The route starts fine as I can see SpringCamelContext output in the logs. > > > > What I am trying to do is to send a message to direct endpoint using > > injected ProducerTemplate but if I do it like this > > > > @Produce(uri = "direct:myQueue") > > private ProducerTemplate producerTemplate; > > > > @EventListener(ContextRefreshedEvent.class) > > public void send() > > throws Exception > > { > > producerTemplate.start(); > > producerTemplate.sendBody("hello"); > > } > > > > > > I am getting > > > > No consumers available on endpoint: direct://myQueue > > > > because the route has not started yet and if I try blocking parameter on > > uri like this: > > > > @Produce(uri = "direct:myQueue?block=true") > > > > then the whole thing freezes for 30 secs (default timeout) and then blows > > up with the same exception > > > > I also tried to call the producerTemplate.sendBody() form @PostConstruct > > annotaded method with the same effect. > > > > > > How do I wait for the CamelContext to be fully started in this > context > > > > > > Cheerio > > Arur > > > > -- > Claus Ibsen > - > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 >
Re: Curious routing case
I guess one thing that come to my mind is to hide all this parallel stuff inside a processor, that would just spit out on the other end the result of processing all those messages. It would handle grouping and serializing and stuff I guess that would reduce the complexity of the route with a cost of complexity of the processor. I have no better ideas anyway, so I will give it a go On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski wrote: > Hello, > > I don't think this route definition is fitting my use case, though I > learnt a thing or two about the interesting patterns linked. Thanks! > > Ok, so let me try to clarify the use case. > > > 1. The stream is infinite, it's not a batch job. The messages keep on > coming from SQS 'all the time' > > 2. More important thing is about parallel processing. > > Let A1 denote a message 1 from group A, B2 message 2 from group B, etc. > > Let's say this is the order in which the messages happen to appear > in the route from SQS > > A1, A2, B1, C1, B2, A3, C2, B3 > > Now what I am trying to achieve is grouping the messages that have to be > processed sequentially (order doesn't matter as long as no two messages > from the same group are processed at the same time). > So I am trying to somehow get these streams > > A1, A2, A3 > > B1, B2, B3 > > C1, C2 > > > So, A1 B1 and C1 can be processed in parallel because they are from > different groups, but the messages within groups need to be processed one > by one. > > In my example, there are 3 groups, but there can be many and I don't know > what they are in advance. The processing logic between the groups is > similar and is a function of the group so I can get a processor for group A > from a method call getProcessor(A), B getProcessor(B), etc. > > I am stuck at how to do that in Camel, because since I don't know the > groups in advance, I would need to create processing routes dynamically. > > Say the system starts, and A1 arrives, there can't be any processor for > group A yet, since it's the first message from the group and I need to > somehow dynamically add processing capability of the group A to the route > and then perhaps if the messages from group A stop arriving for some time, > that processor could be removed. > > How to add the parallel part between the group messages is also blurry to > me. One way of doing this I was thinking was to do a multicast to all the > dynamically created processing routes for groups and stick a filter before > so that only messages from particular group can go through. From multicast > page: > > from("direct:a").multicast().parallelProcessing().to("direct:x", > "direct:y", "direct:z"); > > But here the x,y,z endpoints are hardcoded. I could write up some custom > multicast I suppose to search the routes in CamelContext.. not sure. > > Thanks > Artur > > > > > > On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski > wrote: > >> Hi Zoran, >> >> Thank you for such detailed response. This looks very promising. i will >> need to get my head around the aggregator pattern. >> For this week I will be busy with other tasks, but I will get back to it >> as soon as I can to see if I can get Camel work for the use case. >> >> Cheerio >> Artur >> >> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart wrote: >> >>> Hi Artur, >>> I was thinking that the order of the messages would be important as >>> you need to process them sequentially. >>> >>> So I think you could use the dynamic message routing[1] with >>> aggregator[2], something like: >>> >>> from("aws-sqs:...") >>> .process("#preProcess") >>> .toD("direct:${header.nextRoute}"); >>> >>> from("direct:parallel")...; >>> from("direct:sequential").aggregate(simple("${header.group}" >>> )).completion..; >>> >>> So from yout SQS queue you would use a processor to pre-process >>> message whose responsibility would be to set the (custom) `nextRoute` >>> and (custom) `group` headers. `nextRoute` would be `parallel` or >>> `sequential`, and if `sequential` the messages would be aggregated >>> using the `group` header. >>> >>> You would want to define your own custom aggregation strategy or use >>> the completion* options that are available to you. There also might be >>> need to use seda[3] to fine tune any parallel processing. You might >>> throw in there a
Re: Camel + Spring + Spring Boot, waiting for the routes to start
Sure enough, when I try to do pretty much the same thing from a JUnit test, it works: @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = { Application.class }) public class CamelTest { @Produce(uri = "direct:myQueue") protected ProducerTemplate template; @Test public void sendMessage() { template.sendBody(attachment); } } so... what's the deal? If firing a message is a part of my application logic... what's the recommended way of handling this? Cheerio Artur On Tue, Apr 11, 2017 at 1:30 PM, Artur Jablonski wrote: > Hello, > > I have the Camel + Spring + SpringBoot combo working > > > I have my RouteBilder Spring Component that's being pickup up and started > during Spring startup. > The route starts fine as I can see SpringCamelContext output in the logs. > > What I am trying to do is to send a message to direct endpoint using > injected ProducerTemplate but if I do it like this > > @Produce(uri = "direct:myQueue") > private ProducerTemplate producerTemplate; > > @EventListener(ContextRefreshedEvent.class) > public void send() > throws Exception > { > producerTemplate.start(); > producerTemplate.sendBody("hello"); > } > > > I am getting > > No consumers available on endpoint: direct://myQueue > > because the route has not started yet and if I try blocking parameter on > uri like this: > > @Produce(uri = "direct:myQueue?block=true") > > then the whole thing freezes for 30 secs (default timeout) and then blows > up with the same exception > > I also tried to call the producerTemplate.sendBody() form @PostConstruct > annotaded method with the same effect. > > > How do I wait for the CamelContext to be fully started in this context > > > Cheerio > Arur > > > > > > > > > > > > >
Camel + Spring + Spring Boot, waiting for the routes to start
Hello, I have the Camel + Spring + SpringBoot combo working I have my RouteBilder Spring Component that's being pickup up and started during Spring startup. The route starts fine as I can see SpringCamelContext output in the logs. What I am trying to do is to send a message to direct endpoint using injected ProducerTemplate but if I do it like this @Produce(uri = "direct:myQueue") private ProducerTemplate producerTemplate; @EventListener(ContextRefreshedEvent.class) public void send() throws Exception { producerTemplate.start(); producerTemplate.sendBody("hello"); } I am getting No consumers available on endpoint: direct://myQueue because the route has not started yet and if I try blocking parameter on uri like this: @Produce(uri = "direct:myQueue?block=true") then the whole thing freezes for 30 secs (default timeout) and then blows up with the same exception I also tried to call the producerTemplate.sendBody() form @PostConstruct annotaded method with the same effect. How do I wait for the CamelContext to be fully started in this context Cheerio Arur
Re: Curious routing case
Hello, I don't think this route definition is fitting my use case, though I learnt a thing or two about the interesting patterns linked. Thanks! Ok, so let me try to clarify the use case. 1. The stream is infinite, it's not a batch job. The messages keep on coming from SQS 'all the time' 2. More important thing is about parallel processing. Let A1 denote a message 1 from group A, B2 message 2 from group B, etc. Let's say this is the order in which the messages happen to appear in the route from SQS A1, A2, B1, C1, B2, A3, C2, B3 Now what I am trying to achieve is grouping the messages that have to be processed sequentially (order doesn't matter as long as no two messages from the same group are processed at the same time). So I am trying to somehow get these streams A1, A2, A3 B1, B2, B3 C1, C2 So, A1 B1 and C1 can be processed in parallel because they are from different groups, but the messages within groups need to be processed one by one. In my example, there are 3 groups, but there can be many and I don't know what they are in advance. The processing logic between the groups is similar and is a function of the group so I can get a processor for group A from a method call getProcessor(A), B getProcessor(B), etc. I am stuck at how to do that in Camel, because since I don't know the groups in advance, I would need to create processing routes dynamically. Say the system starts, and A1 arrives, there can't be any processor for group A yet, since it's the first message from the group and I need to somehow dynamically add processing capability of the group A to the route and then perhaps if the messages from group A stop arriving for some time, that processor could be removed. How to add the parallel part between the group messages is also blurry to me. One way of doing this I was thinking was to do a multicast to all the dynamically created processing routes for groups and stick a filter before so that only messages from particular group can go through. From multicast page: from("direct:a").multicast().parallelProcessing().to("direct:x", "direct:y", "direct:z"); But here the x,y,z endpoints are hardcoded. I could write up some custom multicast I suppose to search the routes in CamelContext.. not sure. Thanks Artur On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski wrote: > Hi Zoran, > > Thank you for such detailed response. This looks very promising. i will > need to get my head around the aggregator pattern. > For this week I will be busy with other tasks, but I will get back to it > as soon as I can to see if I can get Camel work for the use case. > > Cheerio > Artur > > On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart wrote: > >> Hi Artur, >> I was thinking that the order of the messages would be important as >> you need to process them sequentially. >> >> So I think you could use the dynamic message routing[1] with >> aggregator[2], something like: >> >> from("aws-sqs:...") >> .process("#preProcess") >> .toD("direct:${header.nextRoute}"); >> >> from("direct:parallel")...; >> from("direct:sequential").aggregate(simple("${header.group}" >> )).completion..; >> >> So from yout SQS queue you would use a processor to pre-process >> message whose responsibility would be to set the (custom) `nextRoute` >> and (custom) `group` headers. `nextRoute` would be `parallel` or >> `sequential`, and if `sequential` the messages would be aggregated >> using the `group` header. >> >> You would want to define your own custom aggregation strategy or use >> the completion* options that are available to you. There also might be >> need to use seda[3] to fine tune any parallel processing. You might >> throw in there a data format unmarshaller[4] instead of the >> `preProcess` processor and use something like `${body.xyz} == foo` in >> the `toD` expression. >> >> And I would guess that you need to examine transactions or persistence >> at some point also in case your aggregation step runs for a long time >> or if your use case is sensitive to message loss if interrupted -- >> which would undoubtedly lead you back to using queues to separate >> those two ways of processing, >> >> HTH, >> >> zoran >> >> [1] https://camel.apache.org/message-endpoint.html >> [2] https://camel.apache.org/aggregator2.html >> [3] https://camel.apache.org/seda.html >> [4] https://camel.apache.org/data-format.html >> >> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski >> wrote: >> > Hey Zoran. >> > >> > I read again the p
Re: Curious routing case
Hi Zoran, Thank you for such detailed response. This looks very promising. i will need to get my head around the aggregator pattern. For this week I will be busy with other tasks, but I will get back to it as soon as I can to see if I can get Camel work for the use case. Cheerio Artur On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart wrote: > Hi Artur, > I was thinking that the order of the messages would be important as > you need to process them sequentially. > > So I think you could use the dynamic message routing[1] with > aggregator[2], something like: > > from("aws-sqs:...") > .process("#preProcess") > .toD("direct:${header.nextRoute}"); > > from("direct:parallel")...; > from("direct:sequential").aggregate(simple("${header. > group}")).completion..; > > So from yout SQS queue you would use a processor to pre-process > message whose responsibility would be to set the (custom) `nextRoute` > and (custom) `group` headers. `nextRoute` would be `parallel` or > `sequential`, and if `sequential` the messages would be aggregated > using the `group` header. > > You would want to define your own custom aggregation strategy or use > the completion* options that are available to you. There also might be > need to use seda[3] to fine tune any parallel processing. You might > throw in there a data format unmarshaller[4] instead of the > `preProcess` processor and use something like `${body.xyz} == foo` in > the `toD` expression. > > And I would guess that you need to examine transactions or persistence > at some point also in case your aggregation step runs for a long time > or if your use case is sensitive to message loss if interrupted -- > which would undoubtedly lead you back to using queues to separate > those two ways of processing, > > HTH, > > zoran > > [1] https://camel.apache.org/message-endpoint.html > [2] https://camel.apache.org/aggregator2.html > [3] https://camel.apache.org/seda.html > [4] https://camel.apache.org/data-format.html > > On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski > wrote: > > Hey Zoran. > > > > I read again the patterns you mentioned. In my use case the order of > > processing within a group doesn't matter as long as two messages from the > > same group are never processed in parallel. So i guess resenquencer is > out > > of the picture unless I didn't get the intention. > > > > So what we are left with is the content based router. Sure. The message > > comes, i can see what group it belongs two... And what next? Perhaps it's > > the very first message from that group so I would need to trigger > creating > > route/processor for that group somehow, perhaps messages from this group > > were processed before in which case the processor for the group should > > already exist... > > > > > > > > > > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" wrote: > > > >> Hi Artur, > >> have a look at Camel EIP page[1], what you describe sounds to me like > >> Resequencer and Content based router patterns, > >> > >> zoran > >> > >> [1] https://camel.apache.org/eip.html > >> > >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski > >> wrote: > >> > Hello. > >> > > >> > I wonder if someone could push me in the right direction trying to > >> express > >> > quite curious case in Camel route. > >> > > >> > Imagine there's a stream of messages some of which can be processed in > >> > parallel and some have to be processed serially. You can group the > >> messages > >> > that require serial processing together by looking at the message > body. > >> You > >> > don't know upfront how many groups can occur in the stream. > >> > > >> > The way I thought about doing this is having a route for each message > >> > group. Since I don't know upfront how many and what groups there will > be > >> > then I would need to create routes dynamically. If a message comes > >> > belonging to a group that doesn't have it's handling route, then i > could > >> > create it (is that even possible??) Then if there's no messages coming > >> for > >> > a given group in some time I could remove the route for the group to > >> > cleanup (is that possible?) > >> > > >> > New to Camel > >> > > >> > Thx! > >> > Artur > >> > >> > >> > >> -- > >> Zoran Regvart > >> > > > > -- > Zoran Regvart >
Re: Curious routing case
Hey Zoran. I read again the patterns you mentioned. In my use case the order of processing within a group doesn't matter as long as two messages from the same group are never processed in parallel. So i guess resenquencer is out of the picture unless I didn't get the intention. So what we are left with is the content based router. Sure. The message comes, i can see what group it belongs two... And what next? Perhaps it's the very first message from that group so I would need to trigger creating route/processor for that group somehow, perhaps messages from this group were processed before in which case the processor for the group should already exist... On 31 Mar 2017 7:58 p.m., "Zoran Regvart" wrote: > Hi Artur, > have a look at Camel EIP page[1], what you describe sounds to me like > Resequencer and Content based router patterns, > > zoran > > [1] https://camel.apache.org/eip.html > > On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski > wrote: > > Hello. > > > > I wonder if someone could push me in the right direction trying to > express > > quite curious case in Camel route. > > > > Imagine there's a stream of messages some of which can be processed in > > parallel and some have to be processed serially. You can group the > messages > > that require serial processing together by looking at the message body. > You > > don't know upfront how many groups can occur in the stream. > > > > The way I thought about doing this is having a route for each message > > group. Since I don't know upfront how many and what groups there will be > > then I would need to create routes dynamically. If a message comes > > belonging to a group that doesn't have it's handling route, then i could > > create it (is that even possible??) Then if there's no messages coming > for > > a given group in some time I could remove the route for the group to > > cleanup (is that possible?) > > > > New to Camel > > > > Thx! > > Artur > > > > -- > Zoran Regvart >
Re: Curious routing case
Hmmm. I am getting messages from Amazon sqs and can't change it. Let's say I want to see if I can do it in Camel without putting another messaging system in between. Interesting feature of amq though. Thx! On 31 Mar 2017 5:15 p.m., "Quinn Stevenson" wrote: I’d probably use ActiveMQ Message Groups for this http://activemq.apache.org/message-groups.html <http://activemq.apache.org/ message-groups.html> > On Mar 31, 2017, at 9:08 AM, Artur Jablonski wrote: > > Hello. > > I wonder if someone could push me in the right direction trying to express > quite curious case in Camel route. > > Imagine there's a stream of messages some of which can be processed in > parallel and some have to be processed serially. You can group the messages > that require serial processing together by looking at the message body. You > don't know upfront how many groups can occur in the stream. > > The way I thought about doing this is having a route for each message > group. Since I don't know upfront how many and what groups there will be > then I would need to create routes dynamically. If a message comes > belonging to a group that doesn't have it's handling route, then i could > create it (is that even possible??) Then if there's no messages coming for > a given group in some time I could remove the route for the group to > cleanup (is that possible?) > > New to Camel > > Thx! > Artur
Curious routing case
Hello. I wonder if someone could push me in the right direction trying to express quite curious case in Camel route. Imagine there's a stream of messages some of which can be processed in parallel and some have to be processed serially. You can group the messages that require serial processing together by looking at the message body. You don't know upfront how many groups can occur in the stream. The way I thought about doing this is having a route for each message group. Since I don't know upfront how many and what groups there will be then I would need to create routes dynamically. If a message comes belonging to a group that doesn't have it's handling route, then i could create it (is that even possible??) Then if there's no messages coming for a given group in some time I could remove the route for the group to cleanup (is that possible?) New to Camel Thx! Artur