Hello,

I have inherited some code that is running Apache Camel and was asked if it was possible to send a notification whenever a processed file failed. I've found two links that looked promising to me.

1. https://issues.apache.org/jira/browse/CAMEL-3372
2. https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java

The idea would be to use the ".onCompletion().onFailureOnly()" to react to failed file process and send a notification. But in fact what's happening is that the file is being moved to the failed folder but is not invoking my FileRollback:onFailure method.

I've built a test that is the closest as possible to our current code. The difference is that instead of being file:// is sftp:// but for the test case is not important.

Can you help me?

Thanks

public class FileProcessingFailedNotificationTest extends ContextTestSupport {

    private static final String basePath = "resources/";

    public void testSmoke() throws InterruptedException {
        Thread.sleep(10000);
    }

    @Override
    protected RouteBuilder createRouteBuilder() {

        return new RouteBuilder() {
            @Override
            public void configure() {
from("file://" + basePath + "?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed")
                        .to("direct:csvProcessor");

                from("direct:csvProcessor")
                        .onCompletion().onFailureOnly()
                            .bean(FileRollback.class, "onFailure")
                        .end()
                        .process(new CSVProcessor())
                        .split(body()).streaming().parallelProcessing()
                        .process(new LineProcessor())
                        .end();
            }
        };
    }

    public static class FileRollback implements Synchronization {

        public void onComplete(Exchange exchange) {
            System.out.println("FileRollback:onComplete");
        }

        public void onFailure(Exchange exchange) {
            System.out.println("FileRollback:onFailure");
        }
    }

    private static class CSVProcessor implements Processor {

        @Override
        public void process(Exchange exchange) throws Exception {
            CsvDataFormat csvDataFormat = new CsvDataFormat();
            csvDataFormat.setDelimiter(',');
            csvDataFormat.setLazyLoad(true);
            csvDataFormat.setUseMaps(true);

            ServiceHelper.startService(csvDataFormat);

InputStream stream = exchange.getIn().getMandatoryBody(InputStream.class);
            Message out = exchange.getOut();
            out.copyFrom(exchange.getIn());
            Object result = csvDataFormat.unmarshal(exchange, stream);
            out.setBody(result);
        }
    }

    private static class LineProcessor implements Processor {
        @Override
        public void process(Exchange exchange) {
            String body = exchange.getIn().getBody(String.class);

            if (body.contains("kaboom")) {
                throw new RuntimeException("kaboom");

            } else {
                System.out.println(body);
            }
        }
    }
}

Reply via email to