Large file processing with Apache Camel

2013-02-21 Thread cristisor
Hello everybody,

I'm using Apache Fuse ESB with Apache Camel 2.4.0 (I think) to process some
large files. Until now a service unit deployed in servicemix would read the
file line by line, create and send an exchange containing that line to
another service unit that would analyze the line and transform it into an
xml according to some parameters, then send the new exchange to a new
service unit that would map that xml to another xml format and send the new
exchange containing the new xml to a final service unit that unmarshals the
xml and inserts the object into a database. I arrived on the project, the
architecture and the design are not mine, and I have to fix some serious
performance problems. I suspect that reading the files line by line is
slowing the processing very much, so I inserted an AggregationStrategy to
aggregate 100 - 200 lines at once. Here I get into trouble:
- if I send an exchange with more than 1 line I have to make a lot of
changes on the xml to xml mappers, choice processors, etc
- even if I solve the first problem, if I read 500 lines at once and I
create a big xml from the data I get into an OOME exception, so I should
read up to 50 lines in order to make sure that no exceptions will arise

What I'm looking for is a way to read 500 - 1000 lines at once but send each
one in a different exchange to the service unit that creates the initial
xml. My route looks similar to this one now:

from("file://myfile.txt")
.marshal().string("UTF-8")
.split(body().tokenize("\n")).streaming()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), 
new
StringBodyAggregator()).completionSize(50)
.process(processor)
.to("activemq queue");

I read something about a template producer but I'm not sure if it can help
me. Basically I want to insert a mechanism to send more than one exchange,
one for each read line, to the processor and then to the endpoint. This way
I read from the file in batches of hundreds or thousands and I keep using
the old mechanism for mapping, one line at a time.

Thank you.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-tp5727977.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Large file processing with Apache Camel

2013-02-21 Thread cristisor
Many thanks for your reply.

When I read from the file I read simple lines, not XML. It takes more than
one hour to read, process and insert into the db 20.000 lines so I took out
the service unit that does the db operations and I was left with 26 minutes
for reading from the file line by line, converting to a stage 1 XML and then
to a stage 2 XML. The machine is an i7 with 8 GB of RAM, so I don't think
that it's normal to take so long. This is why I suspected that the I/O
operations, when reading the file, are slowing me down very much.

Do you think that it will take me 100 times more to read 100 lines instead
of just one? The performance hit might not be at this point?

Is sending the exchange to the queue and reading it inside another service
unit slowing me down?

I'm not using Stax Stream, or at least I don't think that I'm using, to deal
the XML but I will take a look on this problem also.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-tp5727977p5727988.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Large file processing with Apache Camel

2013-02-22 Thread cristisor
I will try to provide the steps that are in the current version:
1. read one line from the file, set it as the outbound message's body of an
exchange, and, according to the file type, send the exchange to an activemq
queue
2. the exchange will arrive on another service unit that has a processor
which creates an input stream from that line and sends it to an xml mapper
generated using Altova MapForce 2011 (as I mentioned before, I didn't choose
the mapper but they say it's extremely fast). This mapper returns a
ByteArrayOutputStream output containing an xml string that represents the
mapping of some values from the read line to actual xml fields. As a basic
example, number 200 from the line will be mapped to
200. The xml gets set as the outbound
message's body of the exchange and the exchange is being sent to another
queue
3. when the exchange with the xml is received it gets sent to another
processor, with another generated mapper, that maps this xml to another xml,
for example 200 to 200.
This is just a simple example but the mapping can be more complex. The final
xml string is set as the outbound message's body of the exchange and the
exchange is being sent to the final service unit.
4. the final service unit picks up the exchange, unmarshals it's body into
an actual db value object and inserts that object into the db

When I get the OOME I actually append each ByteArrayOutputStream output's
toString() to a StringBuilder. I have to do this because I get 500 lines
from the file, I map each of them into an xml in a while loop and I have no
idea how to send each xml into an exchange so I append everything and set
the final result to the exchange's outbound message body. If I could send
each xml after I map it, instead of appending it, and map another one inside
the same process method it would be perfect, it would be the answer to my
problem.

I want to implement batch inserts/updates on the db to increase the
performance and I also want to read hundreds of lines from the text file but
at a certain point send the mapped xml in exchanges one by one, not all of
them at the same time. I think that I/O operations take a lot of time, just
like in "Parsing large Files with Apache Camel" from catify.com where he
raised the number of read lines per second from 200 to 4000 by reading in
batches instead of per line.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-tp5727977p5728001.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Large file processing with Apache Camel

2013-02-22 Thread cristisor
Thank you everybody for your help. After reading and trying, I found how to
implement splitter and aggregators and I managed to achieve my scope.

Here is the new status:
1. read 500 lines from the file and send them in an exchange to the next
service unit, to a certain endpoint according to the file type
2. this service unit first splits the exchange in 500 exchanges and then
sends each exchange to the same processor that does the step1 xml mapping
3. after the processor I aggregate the exchanges back by concatenating the
out xmls, but only 50 at a time so that I won't get a OOME or something else
4. after the aggregator concatenates 50 exchanges or a certain amount of
time passes I send the new exchange to the next service unit, step2 xml
mapping
5. now I split the exchange again in 50 different exchanges and each of them
goes to the processor, where the final xml is being created
6. the final xml is sent in the exchange to the database service unit, where
I first aggregate 300 exchanges by grouping them and then I send the grouped
exchange to the final processor so that I can get the data from all 300
exchanges and do a batch insert/update with a prepared statement and 300
rows

Now I have 3 more questions, maybe you can give me some tips:
1. the server has 4 cores, can I do parallel processing so that I can have 4
lines processed in parallel, 4 xmls generated in parallel, etc?
2. I think that I can compress the xml strings, so that I might be able to
send hundreds of xmls instead of 50 or 100, am I right, should I investigate
this path?
3. in the final service unit, the one that inserts into the database, we use
a ChoiceProcessor to select the specific processor, according to the DAO
type, so that I can make the insert/update procedure. This choice processor
has a list of FilterProcessors and each FilterProcessor has the actual DAO
processor. How can I insert an aggregator between the FilterProcessor and
the final DAO processor without using:
route definition -> choose -> when() -> aggregate -> process -> otherwise ->
aggregate -> process

Sorry for so many questions but I woke up one day with a very slow
application and I have to make it work faster as soon as possible.

Thanks again.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-tp5727977p5728045.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Large file processing with Apache Camel

2013-02-28 Thread cristisor
After digging more into my problem I found that the slow db access was the
main issue, maybe you heard before of setting
sendStringParametersAsUnicode=false in the jdbc driver to dramatically
increase the performance.

Since the last time I posted here I learned a lot about apache camel and I
implemented some nice changes, but I'm confronting with a synchronization
problem right now. I split each line that comes from the file and I enable
parallel processing:
from().split(body(String.class).tokenize("\n")).parallelProcessing().process(processor).to()

Inside this processor I take the line, send it to the xml mapper as an input
stream and in the end I set the xml as the body of the out message. The
processor's process method is not synchronized and I run into situations
where the input line has a code "1234" and the output doesn't contain the
code "1234" but "1235", but this code belongs to another line 100% sure. So
I should control the synchronization process by myself, right?



--
View this message in context: 
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-tp5727977p5728292.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: handling large files

2013-03-02 Thread cristisor
I know that this is an old post but I'm in the exact situation and I can't
find a way out. Some of you already know what I process with Apache Camel,
but I will add the link again:
http://camel.465427.n5.nabble.com/Large-file-processing-with-Apache-Camel-td5727977.html

And here is a detailed description of the environment, configuration and
error:
http://activemq.2283324.n4.nabble.com/javax-jms-JMSException-java-io-EOFException-in-ActiveMQ-td4664311.html

I suspect that this is an ActiveMQ problem, the broker simply closes a
connection for the service unit that reads lines from the file, but I don't
understand why or where.



--
View this message in context: 
http://camel.465427.n5.nabble.com/handling-large-files-tp471986p5728373.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Aggregate exchanges until before EoF

2013-03-06 Thread cristisor
Hello,

Hi guys,

I'm processing a file line by line and I want to aggregate all the exchanges
(lines) until the one that has "Exchange.SPLIT_COMPLETE", basically I want
to send the aggregated exchanges just before aggregating the final one. When
the last exchange arrives it should be in the case where oldExchange is
null. Is this possible?



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregate-exchanges-until-before-EoF-tp5728680.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Aggregate exchanges until before EoF

2013-03-06 Thread cristisor
No, I want to send that last line without aggregating it, or aggregating it
as the first and only exchange of that aggregation strategy. What I actually
need is to have a final exchange, containing a line from the file and the
FILE_COMPLETE header, that gets sent through a few service units, each
processes its data, and the last service unit finds that it's the end of the
file and resets a flag into the database so that the next file from the
folder gets read. If I aggregate it with other exchanges, then split
everything into the next service unit and so on I can't keep track of the
end of the file, I risk having more than one exchange with this header.

If you need a better explanation please let me know.
Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregate-exchanges-until-before-EoF-tp5728680p5728686.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Aggregate exchanges until before EoF

2013-03-06 Thread cristisor
Yes, I already used aggregators to build batches of data that I will later
use for db operations and they work great. But I don't know how to implement
a custom splitter that, for example, sets the file complet flag only on the
last split line, is this possible?



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregate-exchanges-until-before-EoF-tp5728680p5728689.html
Sent from the Camel - Users mailing list archive at Nabble.com.


CXFRS producer deployed in Fuse ESB

2013-05-09 Thread cristisor
Hello everybody,

I'm using Apache Fuse ESB 5.4.0-fuse-00-00 with Apache Camel
2.4.0-fuse-00-00.

I need to expose a REST service that can be called by a consumer located on
another machine. I read about Camel CXFRS but I'm not sure if it is what I
need. After reading  here    and  here

  
I could use an opinion on the following: can I create a REST web service,
deploy it as a JBI service assembly and call it? Please give me some links
where I can get more detailed explanations.

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/CXFRS-producer-deployed-in-Fuse-ESB-tp5732227.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Web server in ServiceMix with camel-jetty or camel-servlet

2013-06-20 Thread cristisor
Hello,

We are trying to send blob messages (large files) through an ActiveMQ broker
integrated in ServiceMix. It seems that this operation requires a ftp or a
http server for upload and download and we are wondering if this can be
achieved with a servicemix camel component implementation. I am looking over
the camel-jetty and camel-http components but my main problem is that what I
actually need is for the implementation of the HTTP PUT method to be the
default one, so that it can write the uploaded file on the disk and read it
afterwards.

I see something like this in camel:jetty:
from("jetty:http://localhost:{{port}}/myapp/myservice";).process(new
MyBookService());

This would sound great for a GET or POST request, but for the PUT request I
would have to implement the standard mechanism of the HTTP PUT method again.

So, is there a way to route from a jetty endpoint, like the one above, to
some "magic" endpoint that basically does the standard HTTP PUT function and
saves the file on the server?

Or maybe the camel:http could do this? Please look at the versions that I'm
using also.

ActiveMQ version: 5.4.0-fuse-00-00
ServiceMix version: 3.5.0-fuse-00-00

Thanks. 



--
View this message in context: 
http://camel.465427.n5.nabble.com/Web-server-in-ServiceMix-with-camel-jetty-or-camel-servlet-tp5734439.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Mixing ChoiceDefinition with DelayDefinition

2013-06-21 Thread cristisor
Hello,

I have the following route:
from(startEndpoint)
.choice()
.when(fileIsComplete()).delay(COMPLETION_TIMEOUT).process(processor).to(outputEndpoint)
.otherwise().aggregate(new ObjectsAggregator()).constant(true)
.completionSize(BATCH_SIZE).completionTimeout(COMPLETION_TIMEOUT)
.process(processor).stop();

So, if the fileIsComplete is true, process only one exchange and send the
response to another endpoint otherwise aggregate up to a number of
BATCH_SIZE, process in the end, and stop the routing.

Because my Camel version is pretty old (2.4.0-fuse-00-00) I don't have
"endChoice", so if I insert the delay or the aggregator I can't access the
"otherwise" branch. I really need both of them so I thought to insert
another processor, before the actual processing, that puts the current
thread to sleep for COMPLETION_TIMEOUT and leave the aggregation to the
other branch, since I won't have to continue using the ChoiceDefinition.

Is there a better way to solve this problem without upgrading the Camel
version?

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Mixing-ChoiceDefinition-with-DelayDefinition-tp5734524.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Mixing ChoiceDefinition with DelayDefinition

2013-06-21 Thread cristisor
Thanks for your quick answer. Now I only have to adjust the delay.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Mixing-ChoiceDefinition-with-DelayDefinition-tp5734524p5734536.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Mixing ChoiceDefinition with DelayDefinition

2013-06-25 Thread cristisor
Hello Claus,

I have one more question. This is the working route, as per your advice:

from(startEndpoint)
.choice()
.when(fileIsNotComplete())
.to(tempEndpoint)
.otherwise()
.delay(5 * COMPLETION_TIMEOUT)
.process(processor).to(outputEndpoint);

from(tempEndpoint)
.aggregate(new ObjectsAggregator()).constant(true)
   
.completionSize(BATCH_SIZE).completionTimeout(COMPLETION_TIMEOUT)
.process(processor).stop();

My purpose is to process all the exchanges that come from a file, through a
couple of routes that do extra processing, but allow only for the last
exchange, which is detected inside "fileIsNotComplete", to go to a final
endpoint and set a "file done" flag into the database, so that the first
route of the business logic can start processing another file.

Even though the process method is synchronized and I inserted the big delay,
there are situations when the final exchange, the one going to the "when()"
choice, gets its turn to be processed before other batches of exchanges,
from the "otherwise" choice, which are waiting in line.

Is there a way to pause/forbid route "when" to do its job until route
"otherwise" will have finished everything?

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Mixing-ChoiceDefinition-with-DelayDefinition-tp5734524p5734708.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Web server in ServiceMix with camel-jetty or camel-servlet

2013-06-25 Thread cristisor
We have decided to go with an embedded Apache MINA FtpServer for increased
performance and scalability. I think that the FtpServer is lighter than an
embedded Jetty server and it servers our needs perfectly.

Thank you for your support. 



--
View this message in context: 
http://camel.465427.n5.nabble.com/Web-server-in-ServiceMix-with-camel-jetty-or-camel-servlet-tp5734439p5734709.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Synchronizing camel processors

2013-07-04 Thread cristisor
Hello,

I'm using camel routes for processing and I thought that the process is
synchronous, unless parallel processing is specified. I have a processor
that unmarshalls from xml strings and I noticed the following in the
logging:
DefaultMessageListenerContainer-2 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-2
DefaultMessageListenerContainer-10 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-10
DefaultMessageListenerContainer-4 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-4
DefaultMessageListenerContainer-3 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-3
DefaultMessageListenerContainer-9 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-9
DefaultMessageListenerContainer-7 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-7
DefaultMessageListenerContainer-5 | UnmarshallerProcessor|
rocessor.UnmarshallerProcessor   33 | Accessed by thread:
DefaultMessageListenerContainer-5

So, if I'm not mistaken, the process method of the UnmarshallerProcessor is
called by different threads and it could lead to serious synchronization
issues if I don't use a lock. Is this correct?

Route without parallel processing explicitly enabled:
from(DIRECT_INPUT)
.process(unmarshallerProcessor)
.to(getOutputEndpoint());

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Synchronizing-camel-processors-tp5735185.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Exceptions when aggregating messages

2013-07-11 Thread cristisor
Hello,

I'm trying to aggregate a bunch of messages, each containing a String as
body, the In message from each exchange that comes to the aggregator, so
that I can send only one exchange containing the list of strings to the next
route in the end.

If I set the body as a List the send operation
fails with the following:
NotSerializableException: org.apache.camel.impl.DefaultMessage

This is understandable since Message is not serializable, but then I tried
aggregating a list List where MessageBean implements the
Serializable interface and contains the body(String), the
header(CaseInsensitiveMap) and properties(CaseInsensitiveMap). This time the
exception is different:
NotSerializableException: org.apache.camel.component.file.GenericFile

Can someone spot the problem? I could aggregated the complete exchanges and
send them, but isn't the overhead too big?



--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Exceptions when aggregating messages

2013-07-11 Thread cristisor
Hello and thank you for your reply.


This is the code for aggregating the whole camel message:
class ObjectsAggregationStrategy implements AggregationStrategy {

final Logger logger =
Logger.getLogger(ObjectsAggregationStrategy.class);

@SuppressWarnings("unchecked")
@Override
public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
List items;
if (oldExchange == null) {
items = new ArrayList();
items.add(newExchange.getIn());
Exchange copyExchange = newExchange.copy();
copyExchange.getIn().setBody(items);
return copyExchange;
}

items = oldExchange.getIn().getBody(List.class);
Message newItem = newExchange.getIn();
items.add(newItem);
oldExchange.getIn().setBody(items);

Exchange finalExchange = updateInMessageHeaders(oldExchange,
newExchange);
finalExchange = updateExchangeProperties(oldExchange,
newExchange);

return finalExchange;
}

private Exchange updateExchangeProperties(Exchange oldExchange,
Exchange newExchange) {
CaseInsensitiveMap properties = new
CaseInsensitiveMap(newExchange.getProperties());

for (Entry entry : properties.entrySet()) {
oldExchange.setProperty(entry.getKey(), entry.getValue());
}

return oldExchange;
}

private Exchange updateInMessageHeaders(Exchange oldExchange,
Exchange newExchange) {
CaseInsensitiveMap headers = new
CaseInsensitiveMap(newExchange.getIn().getHeaders());

for (Entry entry : headers.entrySet()) {
if
(entry.getKey().equals(MessageConstant.ERES_FILE_COMPLETE) &&
entry.getValue().equals(Boolean.TRUE)) {
logger.debug("Received and set the ERES_FILE_COMPLETE
header from routeId["
+ oldExchange.getFromRouteId() + "].");
}
oldExchange.getIn().setHeader(entry.getKey(),
entry.getValue());
}

return oldExchange;
}
}


And this is the code for aggregating a serializable custom bean:
public class MessageAggregationStrategy implements AggregationStrategy {
final Logger logger =
Logger.getLogger(MessageAggregationStrategy.class);

@SuppressWarnings("unchecked")
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
MessageBean bean = setExchangeData(newExchange);
List> items;
if (oldExchange == null) {
items = new ArrayList>();
items.add(bean);
Exchange copyExchange = newExchange.copy();
copyExchange.getIn().setBody(items);
return copyExchange;
}

items = oldExchange.getIn().getBody(List.class);
items.add(bean);
oldExchange.getIn().setBody(items);

return oldExchange;
}

@SuppressWarnings("unchecked")
private MessageBean setExchangeData(Exchange exchange) {
MessageBean bean = new MessageBean();
bean.setBody((T) exchange.getIn().getBody());
bean.setHeaders(new
CaseInsensitiveMap(exchange.getIn().getHeaders()));
bean.setProperties(new
CaseInsensitiveMap(exchange.getProperties()));
return bean;
}


where MessageBean is:
public class MessageBean implements Serializable {
private static final long serialVersionUID = -987592973477513095L;
private T body;
private CaseInsensitiveMap properties;
private CaseInsensitiveMap headers;

public T getBody() {
return body;
}

public CaseInsensitiveMap getProperties() {
return properties;
}

public CaseInsensitiveMap getHeaders() {
return headers;
}

public void setBody(T body) {
this.body = body;
}

public void setProperties(CaseInsensitiveMap properties) {
this.properties = properties;
}

public void setHeaders(CaseInsensitiveMap headers) {
this.headers = headers;
}
}




--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735560.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Exceptions when aggregating messages

2013-07-16 Thread cristisor
It seems that I was trying to serialize the exchange properties, which is
nonsense because the exchange that arrives on the new route will contain
it's own properties, and everything was failing probably when trying to
serialize the following property:
*CamelFileExchangeFile=GenericFile[path\filename.txt]*

I don't understand why this was happening but I will only send the
information that I need instead of sending everything.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735728.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Exceptions when aggregating messages

2013-07-16 Thread cristisor
I'm afraid that I don't understand which option you are talking about. And
how can I "care about the the property value which could not be serialized",
please explain in more details if possible?

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735765.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Exceptions when aggregating messages

2013-07-17 Thread cristisor
Thanks for making things more clear. I read the documentation and I
understand your point.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735830.html
Sent from the Camel - Users mailing list archive at Nabble.com.


onCompletion not triggerd in testing

2013-11-13 Thread cristisor
Hello,

I am using the following route:

protected void createRoute(Processor processor) {
from(getInputEndpoint())
.onCompletion()
.onCompleteOnly()
.onWhen(customPredicate)
.process(syncProcessor)
.end()
// the onCompletion callback ended and normal processing is
back
.choice()
.when(customPredicate)
.process(syncProcessor)
.otherwise()
.process(processor)
.to(getOutputEndpoint())
.end();
}


and I need to do some processing inside the syncProcessor when some other
messages have completed successfully. In deployment environment the route
works fine but the unit tests are always failing because
".process(syncProcessor)" is never triggered.

This is the outputEndpoint:
@EndpointInject(uri = "mock:output")
protected MockEndpoint resultEndpoint;


Could it be that the "mock:output" is not triggering the onCompletion event?

Thanks,
Cristian.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: count of processed messages when using aggregation

2013-11-13 Thread cristisor
You could set a count header on each exchange which leaves
".bean("myProcessor", "doWork")" and in the end log the number of the last
exchange or have the ".bean("myProcessor", "doWork")" increase an internal
counter and when you receive CamelSplitComplete you go into the myProcessor
bean again and log the counter value, then you reset it so that a new csv
can start from 0.
Something like this:
from("file:input.csv")
   
.unmarshal().csv().split(body()).streaming().parallelProcessing()
.bean("myProcessor", "doWork") // inside the doWork method
you increase the counter
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.parallelProcessing() // why would you need this one?
.to("remote")
.choice()
.when(property("CamelSplitComplete").isEqualTo("true"))
.bean("myProcessor", "logCounterAndResetCounter")
 .otherwise()
 .log("lfile not completed yet");

If sending to the remote server is time consuming and you need performance
then you could do something like this to increase the performance:
.aggregate(constant("id"), new
Aggregator()).completionSize(100).completionTimeout(1000)
.to("seda:queueName");

.from("seda:queueName")
.to("remote")

This will put the aggregated exchanges on another thread which will take
care of the sending and logging while the initial thread continues to
process csv lines without having to wait for the remote machine to
acknowledge the aggregated exchanges.



--
View this message in context: 
http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649p5743205.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-13 Thread cristisor
I will post something tomorrow from work, but the basic idea is that I send 2
kinds of messages and Message Type 2 should be integrated only when all the
Message Type 1 messages are in the database. If this hasn't happened when
MT2 arrives on the "when" route and is processed by the syncProcessor then
the syncProcessor puts MT2 in a persistent queue on the broker and every
time a MT1 is integrated by the "otherwise" path of the route, at completion
with success the syncProcessor comes into action again and checks if it
should retrieve MT2 from the persistent queue and finish processing it.

The test and production environments work fine, I always get the
onCompletion callback, but the unit tests never do. I checked with the
debugger and I can see everything happening with one difference, the
syncProcessor is never called at onCompletion. This is kind of weird.

I will post something tomorrow morning, maybe someone can spot the problem.

Thanks,
Cristian.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743207.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: count of processed messages when using aggregation

2013-11-13 Thread cristisor
This problem can be solved by a stream resequencer.
(http://camel.apache.org/resequencer.html - stream resequencing)
It will reorder the exchanges after being processed and send them to the
aggregator. You have to reorder them so that the exchange with the split
complete is always the last one, and this way you can remove the timeout on
the aggregator and add a predicate:
completionPredicate(header("CamelSplitComplete").isEqualTo(true))



--
View this message in context: 
http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649p5743210.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: count of processed messages when using aggregation

2013-11-13 Thread cristisor
That's the beauty of the stream resequencer, it allows the messages to go one
by one as they get ordered and the difference in speed is very little for
the overall process. The batch aggregator waits for all the exchanges to
completed and then it send them to the aggregator, which will bring
additional delays.
Another way would be to know how many messages arrived in the bean and keep
the last one away just before being aggregated until all the rest are
aggregated, but you might end up with pretty complex routes.



--
View this message in context: 
http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649p5743212.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-13 Thread cristisor
Hello,

I'm back with some code.
The mock route builder:
@Component
public class MockedTransactionToDBPublisherRouteBuilder extends
TransactionToDBPublisherRouteBuilder {

/**
 * @throws java.sql.SQLException
 */
public MockedTransactionToDBPublisherRouteBuilder(DataSource datasource,
TransactionTemplate transactionTemplate, String
redeliveryPolicy, ProducerTemplate producer,
ConsumerTemplate consumer) {
super(datasource, transactionTemplate, redeliveryPolicy, producer,
consumer);
}

/**
 * {@inheritDoc}
 */
@Override
protected String getExceptionEndpoint() {
return "mock:exception";
}

/**
 * {@inheritDoc}
 */
@Override
protected String getDeadLetterEndpoint() {
return "mock:deadletter";
}

/**
 * {@inheritDoc}
 */
@Override
protected String getInputEndpoint() {
return "direct:input";
}

/**
 * {@inheritDoc}
 */
@Override
protected String getOutputEndpoint() {
return "mock:output";
}
}


The route initialized in TransactionToDBPublisherRouteBuilder:
protected void createRoute(Processor processor) {
from(getInputEndpoint())
// when the exchange is a TRANSACTION and is completed
update the number of passed transactions
.onCompletion()
.onCompleteOnly()
   
.onWhen(TransactionPredicates.isMessageType(MessageType.TRANSACTION))
.process(endOfDaySyncProcessor)
.end()
// the onCompletion callback ended and normal processing is
back
.choice()
// the End Of Day must be handled separately since it is
processed after all the TRANSACTIONs
   
.when(TransactionPredicates.isMessageType(MessageType.END_OF_DAY))
.process(endOfDaySyncProcessor)
// let all the other exchanges follow the standard route
// this path can't go through the endOfDaySync route because
we need to intercept the callback
.otherwise()
.process(processor)
.to(getOutputEndpoint())
.end();

// this is the route where the End Of Day go after all the
transactions have been integrated
from(getSyncEndOfDayEndpoint())
.process(processor)
.to(getOutputEndpoint());
}


The base test class:
@ContextConfiguration(locations = { "transactionToDb-context.xml" })
abstract public class AbstractTransactionToDBPublisherRouteBuilderTest
extends AbstractJUnit4SpringContextTests {
protected static final String CHARSET = "UTF-8";

@Autowired
protected CamelContext camelContext;

@Autowired
protected DataSource DATA_SOURCE;

@Autowired
protected TransactionTemplate TRANSACTION_TEMPLATE;

@EndpointInject(uri = "mock:output")
protected MockEndpoint resultEndpoint;

@EndpointInject(uri = "mock:exception")
protected MockEndpoint exceptionEndpoint;

@Produce(uri = "direct:input")
protected ProducerTemplate template;

@BeforeClass
public static void beforeClass() {
// force jibx to use the staxReaderFactory
System.setProperty("org.jibx.runtime.impl.parser",
"org.jibx.runtime.impl.StAXReaderFactory");
}

@Before
public void setUp() throws Exception {
// delete all records from the database
TestUtil.clearDatabase(DATA_SOURCE.getConnection());
}

protected Exchange createExchangeWithBodyAndHeader(Object body,
MessageType type) {
Exchange exchange = new DefaultExchange(camelContext);
Message message = exchange.getIn();
message.setHeader("testClass", getClass().getName());
message.setHeader(MessageConstant.ERES_CONTENT_TYPE,
type.toString());
message.setBody(body);
return exchange;
}
}


The test:
@Test
public void testRouteWithPendingTransactions() throws Exception {
resultEndpoint.setExpectedMessageCount(4);

// sending Transaction 1
InputStream is =
getClass().getClassLoader().getResourceAsStream("Transaction1.xml");
String msg = TestUtil.getInputStreamAsString(is, CHARSET);
is.close();

Exchange exchange = createExchangeWithBodyAndHeader(msg,
MessageType.TRANSACTION);
template.send(exchange);

// sending Transaction 2
is =
getClass().getClassLoader().getResourceAsStream("Transaction2.xml");
msg = TestUtil.getInputStreamAsString(is, CHARSET);
is.close();

exchange = createExchangeWithBodyAndHeader(msg,
MessageType.TRANSACTION);
template.send(exchange);

// sending End Of Process which must wait for 3 transactions to be
processed
is =
getClass().getClassLoader().getResourceAsStream("endBusinessDate.xml");
msg = TestUtil.getInputStreamAsString(is, CHARSET);
is.close();

// tu

Re: Loop in Camel - Calling a service till I receive required status back

2013-11-14 Thread cristisor
Maybe I don't see the whole picture, but why wouldn't you do this in the
processor?
For example:
public void process(Exchange exchange) throws Exception {

processMessage();
callWebService(JSON);

do {
// add some kind of sleep or timer task or anything else
response = callStatusWebService();
} while (response == INTERMEDIATE);
}

Another way for this problem would be to have the same process method
followed by another process method:
from("direct:input")
.process(messageProcessor)
.to("checkResultRoute");

from("checkResultRoute")
.process(checkResultProcessor)
.choice()
.when(test if result is intermediate)
.to("checkResultRoute")
.otherwise()
.to("destination after you have received the SUCCESS/FAILED response");

This should build a loop.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Loop-in-Camel-Calling-a-service-till-I-receive-required-status-back-tp5743203p5743256.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-14 Thread cristisor
Hi Claus,

I'm using Apache Camel 5.5.1.fuse-70-097 and I have to stick with this
version.

But as I mentioned above, I have the same route in production and is working
just fine, the onCompletion is always triggered and it does its job. The
only problem is that I would have liked to have a unit test for the whole
route, because I already have several unit tests for each processor.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743266.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-14 Thread cristisor
My bad, I was looking over the libs of the project and I was tricked by a
activemq-camel library. The version is 2.7.3-fuse-00-89.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743269.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-18 Thread cristisor
I found out the problem which was making my test not reach the onComplete
syncProcessor but I am having another issue now. Inside the processor I use
producer.sendBody(buildDurableSubscription(register), holder);, where
producer is created in the context.xml with 
and
return consumer.receiveBodyNoWait(buildDurableSubscription(register),
SettlementHolder.class);, where the consumer is created with


to put an exchange on a persistent queue, so that in case of power failure
or any other reason I can have the exchange after a restart, and then to
receive it back and process it after onCompletion. The problem is that
sometimes the first time when the "consumer.receiveBodyNoWait" is invoked it
returns null, but if I invoke it twice the second invocation will return the
pending exchange. This is the logging:
first invocation returns null:
12:56:54.628 [Camel (camelContextTransactionToDBPublisher) thread #2 -
OnCompletion] DEBUG org.apache.camel.impl.ConsumerCache - 
Endpoint[activemq://queue:masterDataExport.transactionToDBPublisher.synchronizedSettlement.register1]
12:56:54.631 [Camel (camelContextTransactionToDBPublisher) thread #2 -
OnCompletion] DEBUG o.a.activemq.ActiveMQMessageConsumer - remove:
ID:pc-clucutar-63598-1384772199184-2:1:1:3, *lastDeliveredSequenceId:0*

second invocation returns the object:
12:57:05.134 [Camel (camelContextTransactionToDBPublisher) thread #2 -
OnCompletion] DEBUG org.apache.camel.impl.ConsumerCache - 
Endpoint[activemq://queue:masterDataExport.transactionToDBPublisher.synchronizedSettlement.register1]
12:57:05.138 [Camel (camelContextTransactionToDBPublisher) thread #2 -
OnCompletion] DEBUG o.a.activemq.ActiveMQMessageConsumer - remove:
ID:pc-clucutar-63598-1384772199184-2:1:1:4, *lastDeliveredSequenceId:55*

Even though I can see that the queueSize = 1 in JConsole, the retrieve
operation doesn't retrieve anything and the queueSize doesn't change. The
second call retrieves the exchange and queueSize becomes 0.

Any idea?



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743434.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-18 Thread cristisor
Thank you for pointing that out for me, now the test is working. Still, I
find it weird that the consumer is not retrieving anything even though
JConsole shows that the QueueSize > 0.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743439.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: onCompletion not triggerd in testing

2013-11-19 Thread cristisor
Everything makes more sense now, thank you for the detailed explanation.



--
View this message in context: 
http://camel.465427.n5.nabble.com/onCompletion-not-triggerd-in-testing-tp5743163p5743559.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Camel + ActiveMQ shutdown timeout problem

2014-07-16 Thread cristisor
Hello,

We are using ServiceMix as a container, with an embedded ActiveMQ broker and
some Camel routes deployed as JBI service assemblies. ServiceMix is
registered as a Windows service, so when the server machine shuts down, the
service is also shut down. The current timeout set on ServiceMix is 10
seconds, so if all the components can't be shut down before the timeout,
they are somehow dropped/discarded.

We wouldn't care about this if we didn't have situations when, after a
restart with TimeoutException, some messages are not delivered by the broker
to the Camel endpoints anymore. They are stuck in kahaDB and until the index
is rebuilt or kahaDB is deleted, messages keep accumulating. Some are
delivered after the broker is restarted, but not all of them.

Our research pointed out the fact that the shutdown of the Camel routes is
taking very long, between 20 and 30 seconds, so many times the logging shows
how some Camel routes are "preparing to shutdown complete" but there is no
"shutdown complete" afterwards. Probably some routes are not even prepared
when Windows shuts down.

So, if the routes are in the process of shutting down, could this damage the
kahaDB or the kahaDB index? I am suspecting that Windows kills all the
running threads suddenly, which includes possible threads which are running
and updating kahaDB or the index.

The default timeout for shutting down a Camel route is 300 seconds, if I
lower it to 1 second what could go wrong?

My questions is linked with:
http://activemq.2283324.n4.nabble.com/Another-NegativeArraySizeException-problem-ActiveMQ-5-5-1-fuse-70-097-td4683082.html

Camel version is 2.7.3-fuse-00-89.

Many thanks,
Cristian.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-ActiveMQ-shutdown-timeout-problem-tp5753919.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Help with Camel Spark component

2016-04-15 Thread cristisor
Hi. Our company has been using Apache Camel for various purposes for some
time. Now we want to integrate with a Apache Spark cluster for big data
processing, and we would like to use Camel as much as possible.

The latest release of Camel came with a new component, for running Spark
jobs. The documentation available at
http://camel.apache.org/apache-spark.html is pretty light, so I was
wondering if anybody with Spark and Camel knowledge can point out to a
working example or can answer to some questions.

Best regards.
Cristian.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Help-with-Camel-Spark-component-tp5781176.html
Sent from the Camel - Users mailing list archive at Nabble.com.