Hello,
I have inherited some code that is running Apache Camel and was asked if
it was possible to send a notification whenever a processed file failed.
I've found two links that looked promising to me.
1. https://issues.apache.org/jira/browse/CAMEL-3372
2.
https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java
The idea would be to use the ".onCompletion().onFailureOnly()" to react
to failed file process and send a notification. But in fact what's
happening is that the file is being moved to the failed folder but is
not invoking my FileRollback:onFailure method.
I've built a test that is the closest as possible to our current code.
The difference is that instead of being file:// is sftp:// but for the
test case is not important.
Can you help me?
Thanks
public class FileProcessingFailedNotificationTest extends
ContextTestSupport {
private static final String basePath = "resources/";
public void testSmoke() throws InterruptedException {
Thread.sleep(10000);
}
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("file://" + basePath +
"?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed")
.to("direct:csvProcessor");
from("direct:csvProcessor")
.onCompletion().onFailureOnly()
.bean(FileRollback.class, "onFailure")
.end()
.process(new CSVProcessor())
.split(body()).streaming().parallelProcessing()
.process(new LineProcessor())
.end();
}
};
}
public static class FileRollback implements Synchronization {
public void onComplete(Exchange exchange) {
System.out.println("FileRollback:onComplete");
}
public void onFailure(Exchange exchange) {
System.out.println("FileRollback:onFailure");
}
}
private static class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
CsvDataFormat csvDataFormat = new CsvDataFormat();
csvDataFormat.setDelimiter(',');
csvDataFormat.setLazyLoad(true);
csvDataFormat.setUseMaps(true);
ServiceHelper.startService(csvDataFormat);
InputStream stream =
exchange.getIn().getMandatoryBody(InputStream.class);
Message out = exchange.getOut();
out.copyFrom(exchange.getIn());
Object result = csvDataFormat.unmarshal(exchange, stream);
out.setBody(result);
}
}
private static class LineProcessor implements Processor {
@Override
public void process(Exchange exchange) {
String body = exchange.getIn().getBody(String.class);
if (body.contains("kaboom")) {
throw new RuntimeException("kaboom");
} else {
System.out.println(body);
}
}
}
}