Author: davsclaus Date: Tue Aug 16 16:00:46 2011 New Revision: 1158341 URL: http://svn.apache.org/viewvc?rev=1158341&view=rev Log: CAMEL-4281: File/ftp endpoints is now BrowsableEndpoint which allows to browse the file system. It leverages the consumer logic, but without starting the scheduler.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java - copied, changed from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Aug 16 16:00:46 2011 @@ -38,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Base class for remote file consumers. + * Base class for file consumers. */ public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { protected final transient Logger log = LoggerFactory.getLogger(getClass()); @@ -49,6 +49,7 @@ public abstract class GenericFileConsume protected int maxMessagesPerPoll; protected volatile ShutdownRunningTask shutdownRunningTask; protected volatile int pendingExchanges; + protected Processor customProcessor; public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { super(endpoint, processor); @@ -56,6 +57,25 @@ public abstract class GenericFileConsume this.operations = operations; } + public Processor getCustomProcessor() { + return customProcessor; + } + + /** + * Use a custom processor to process the exchange. + * <p/> + * Only set this if you need to do custom processing, instead of the regular processing. + * <p/> + * This is for example used to browse file endpoints by leveraging the file consumer to poll + * the directory to gather the list of exchanges. But to avoid processing the files regularly + * we can use a custom processor. + * + * @param processor a custom processor + */ + public void setCustomProcessor(Processor processor) { + this.customProcessor = processor; + } + /** * Poll for files */ @@ -148,7 +168,13 @@ public abstract class GenericFileConsume pendingExchanges = total - index - 1; // process the current exchange - processExchange(exchange); + if (customProcessor != null) { + // use a custom processor + customProcessExchange(exchange, customProcessor); + } else { + // process the exchange regular + processExchange(exchange); + } } // remove the file from the in progress list in case the batch was limited by max messages per poll @@ -332,6 +358,35 @@ public abstract class GenericFileConsume } /** + * Processes the exchange using a custom processor. + * + * @param exchange the exchange + * @param processor the custom processor + */ + protected void customProcessExchange(final Exchange exchange, final Processor processor) { + GenericFile<T> file = getExchangeFileProperty(exchange); + log.trace("Custom processing file: {}", file); + + // must extract the absolute name before the begin strategy as the file could potentially be pre moved + // and then the file name would be changed + String absoluteFileName = file.getAbsoluteFilePath(); + + try { + // process using the custom processor + processor.process(exchange); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); + } + } finally { + // always remove file from the in progress list as its no longer in progress + // use the original file name that was used to add it to the repository + // as the name can be different when using preMove option + endpoint.getInProgressRepository().remove(absoluteFileName); + } + } + + /** * Strategy for validating if the given remote file should be included or not * * @param file the file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Tue Aug 16 16:00:46 2011 @@ -19,8 +19,10 @@ package org.apache.camel.component.file; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; @@ -33,6 +35,7 @@ import org.apache.camel.Processor; import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.BrowsableEndpoint; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.Language; @@ -44,9 +47,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Generic FileEndpoint + * Base class for file endpoints */ -public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint { +public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint implements BrowsableEndpoint { protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory"; protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000; @@ -132,6 +135,44 @@ public abstract class GenericFileEndpoin } /** + * This implementation will <b>not</b> load the file content. + * Any file locking is neither in use by this implementation.. + */ + @Override + public List<Exchange> getExchanges() { + final List<Exchange> answer = new ArrayList<Exchange>(); + + GenericFileConsumer consumer = null; + try { + // create a new consumer which can poll the exchanges we want to browse + // do not provide a processor as we do some custom processing + consumer = createConsumer(null); + consumer.setCustomProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + answer.add(exchange); + } + }); + // do not start scheduler, as we invoke the poll manually + consumer.setStartScheduler(false); + // start consumer + ServiceHelper.startService(consumer); + // invoke poll which performs the custom processing, so we can browse the exchanges + consumer.poll(); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + try { + ServiceHelper.stopService(consumer); + } catch (Exception e) { + log.debug("Error stopping consumer used for browsing exchanges. This exception will be ignored", e); + } + } + + return answer; + } + + /** * A strategy method to lazily create the file strategy */ @SuppressWarnings("unchecked") Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Aug 16 16:00:46 2011 @@ -43,6 +43,7 @@ public abstract class ScheduledPollConsu private ScheduledFuture<?> future; // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties + private boolean startScheduler = true; private long initialDelay = 1000; private long delay = 500; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; @@ -227,6 +228,21 @@ public abstract class ScheduledPollConsu this.pollStrategy = pollStrategy; } + public boolean isStartScheduler() { + return startScheduler; + } + + /** + * Sets whether the scheduler should be started when this consumer starts. + * <p/> + * This option is default true. + * + * @param startScheduler whether to start scheduler + */ + public void setStartScheduler(boolean startScheduler) { + this.startScheduler = startScheduler; + } + // Implementation methods // ------------------------------------------------------------------------- @@ -244,6 +260,12 @@ public abstract class ScheduledPollConsu ObjectHelper.notNull(executor, "executor", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); + if (isStartScheduler()) { + startScheduler(); + } + } + + protected void startScheduler() { if (isUseFixedDelay()) { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Tue Aug 16 16:00:46 2011 @@ -86,19 +86,24 @@ public abstract class ScheduledPollEndpo private void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) { // special for scheduled poll consumers as we want to allow end users to configure its options // from the URI parameters without the consumer. prefix + Object startScheduler = options.remove("startScheduler"); Object initialDelay = options.remove("initialDelay"); Object delay = options.remove("delay"); Object timeUnit = options.remove("timeUnit"); Object useFixedDelay = options.remove("useFixedDelay"); Object pollStrategy = options.remove("pollStrategy"); Object runLoggingLevel = options.remove("runLoggingLevel"); - if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null || runLoggingLevel != null) { + if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null + || runLoggingLevel != null || startScheduler != null) { if (consumerProperties == null) { consumerProperties = new HashMap<String, Object>(); } if (initialDelay != null) { consumerProperties.put("initialDelay", initialDelay); } + if (startScheduler != null) { + consumerProperties.put("startScheduler", startScheduler); + } if (delay != null) { consumerProperties.put("delay", delay); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java?rev=1158341&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.BrowsableEndpoint; + +/** + * + */ +public class FileBrowsableEndpointTest extends ContextTestSupport { + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/browse"); + super.setUp(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testBrowsableNoFiles() throws Exception { + BrowsableEndpoint browse = context.getEndpoint("file:target/browse", BrowsableEndpoint.class); + assertNotNull(browse); + + List<Exchange> list = browse.getExchanges(); + assertNotNull(list); + assertEquals(0, list.size()); + } + + public void testBrowsableOneFile() throws Exception { + template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); + + FileEndpoint endpoint = context.getEndpoint("file:target/browse", FileEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); + + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(1, list.size()); + + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the file is still there + File file = new File("target/browse/a.txt"); + assertTrue("File should exist " + file, file.exists()); + } + + public void testBrowsableTwoFiles() throws Exception { + template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "b.txt"); + + FileEndpoint endpoint = context.getEndpoint("file:target/browse?sortBy=file:name", FileEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); + + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(2, list.size()); + + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the files is still there + File fileA = new File("target/browse/a.txt"); + assertTrue("File should exist " + fileA, fileA.exists()); + File fileB = new File("target/browse/b.txt"); + assertTrue("File should exist " + fileB, fileB.exists()); + } + + public void testBrowsableThreeFilesRecursive() throws Exception { + template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "foo/b.txt"); + template.sendBodyAndHeader("file:target/browse", "C", Exchange.FILE_NAME, "bar/c.txt"); + + FileEndpoint endpoint = context.getEndpoint("file:target/browse?recursive=true&sortBy=file:name", FileEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); + + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(3, list.size()); + + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY)); + assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the files is still there + File fileA = new File("target/browse/a.txt"); + assertTrue("File should exist " + fileA, fileA.exists()); + File fileB = new File("target/browse/foo/b.txt"); + assertTrue("File should exist " + fileB, fileB.exists()); + File fileC = new File("target/browse/bar/c.txt"); + assertTrue("File should exist " + fileC, fileC.exists()); + } +} Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java (from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java&r1=1158296&r2=1158341&rev=1158341&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011 @@ -16,35 +16,123 @@ */ package org.apache.camel.component.file.remote; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; +import java.io.File; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.BrowsableEndpoint; import org.junit.Test; /** * @version */ -public class FromFtpToMockTest extends FtpServerTestSupport { - protected String expectedBody = "Hello there!"; +public class FtpBrowsableEndpointTest extends FtpServerTestSupport { private String getFtpUrl() { - return "ftp://admin@localhost:" + getPort() + "/tmp/camel?password=admin&recursive=true"; + return "ftp://admin@localhost:" + getPort() + "/browse?password=admin"; + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testBrowsableNoFiles() throws Exception { + // make sure starting directory exists + createDirectory(FTP_ROOT_DIR + "browse"); + + BrowsableEndpoint browse = context.getEndpoint(getFtpUrl(), BrowsableEndpoint.class); + assertNotNull(browse); + + List<Exchange> list = browse.getExchanges(); + assertNotNull(list); + assertEquals(0, list.size()); } @Test - public void testFtpRoute() throws Exception { - MockEndpoint resultEndpoint = getMockEndpoint("mock:result"); - resultEndpoint.expectedBodiesReceived(expectedBody); + public void testBrowsableOneFile() throws Exception { + template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); + + FtpEndpoint endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); - template.sendBodyAndHeader(getFtpUrl(), expectedBody, "cheese", 123); + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(1, list.size()); - resultEndpoint.assertIsSatisfied(); + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the file is still there + File file = new File(FTP_ROOT_DIR + "browse/a.txt"); + assertTrue("File should exist " + file, file.exists()); } - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - public void configure() throws Exception { - from(getFtpUrl()).to("mock:result"); - } - }; + @Test + public void testBrowsableTwoFiles() throws Exception { + template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "b.txt"); + + FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&sortBy=file:name", FtpEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); + + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(2, list.size()); + + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the files is still there + File fileA = new File(FTP_ROOT_DIR + "browse/a.txt"); + assertTrue("File should exist " + fileA, fileA.exists()); + File fileB = new File(FTP_ROOT_DIR + "browse/b.txt"); + assertTrue("File should exist " + fileB, fileB.exists()); } + + @Test + public void testBrowsableThreeFilesRecursive() throws Exception { + template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "foo/b.txt"); + template.sendBodyAndHeader(getFtpUrl(), "C", Exchange.FILE_NAME, "bar/c.txt"); + + FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&recursive=true&sortBy=file:name", FtpEndpoint.class); + assertNotNull(endpoint); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); + assertEquals(0, repo.getCacheSize()); + + List<Exchange> list = endpoint.getExchanges(); + assertNotNull(list); + assertEquals(3, list.size()); + + assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); + assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY)); + assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY)); + + // the in progress repo should not leak + assertEquals(0, repo.getCacheSize()); + + // and the files is still there + File fileA = new File(FTP_ROOT_DIR + "browse/a.txt"); + assertTrue("File should exist " + fileA, fileA.exists()); + File fileB = new File(FTP_ROOT_DIR + "browse/foo/b.txt"); + assertTrue("File should exist " + fileB, fileB.exists()); + File fileC = new File(FTP_ROOT_DIR + "browse/bar/c.txt"); + assertTrue("File should exist " + fileC, fileC.exists()); + } + } \ No newline at end of file