Author: dkulp Date: Mon Sep 19 18:33:58 2011 New Revision: 1172738 URL: http://svn.apache.org/viewvc?rev=1172738&view=rev Log: Merged revisions 1158341 via svnmerge from https://svn.apache.org/repos/asf/camel/trunk
........ r1158341 | davsclaus | 2011-08-16 12:00:46 -0400 (Tue, 16 Aug 2011) | 1 line CAMEL-4281: File/ftp endpoints is now BrowsableEndpoint which allows to browse the file system. It leverages the consumer logic, but without starting the scheduler. ........ Added: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java - copied unchanged from r1158341, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java camel/branches/camel-2.8.x/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java - copied unchanged from r1158341, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java Modified: camel/branches/camel-2.8.x/ (props changed) camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Sep 19 18:33:58 2011 @@ -1 +1 @@ -/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159596,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164342-1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947 +/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1158341,1159171,1159174,1159326,1159457,1159460,1159596,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164342-1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947 Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1172738&r1=1172737&r2=1172738&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Sep 19 18:33:58 2011 @@ -38,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Base class for remote file consumers. + * Base class for file consumers. */ public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { protected final transient Logger log = LoggerFactory.getLogger(getClass()); @@ -49,6 +49,7 @@ public abstract class GenericFileConsume protected int maxMessagesPerPoll; protected volatile ShutdownRunningTask shutdownRunningTask; protected volatile int pendingExchanges; + protected Processor customProcessor; public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { super(endpoint, processor); @@ -56,6 +57,25 @@ public abstract class GenericFileConsume this.operations = operations; } + public Processor getCustomProcessor() { + return customProcessor; + } + + /** + * Use a custom processor to process the exchange. + * <p/> + * Only set this if you need to do custom processing, instead of the regular processing. + * <p/> + * This is for example used to browse file endpoints by leveraging the file consumer to poll + * the directory to gather the list of exchanges. But to avoid processing the files regularly + * we can use a custom processor. + * + * @param processor a custom processor + */ + public void setCustomProcessor(Processor processor) { + this.customProcessor = processor; + } + /** * Poll for files */ @@ -148,7 +168,13 @@ public abstract class GenericFileConsume pendingExchanges = total - index - 1; // process the current exchange - processExchange(exchange); + if (customProcessor != null) { + // use a custom processor + customProcessExchange(exchange, customProcessor); + } else { + // process the exchange regular + processExchange(exchange); + } } // remove the file from the in progress list in case the batch was limited by max messages per poll @@ -332,6 +358,35 @@ public abstract class GenericFileConsume } /** + * Processes the exchange using a custom processor. + * + * @param exchange the exchange + * @param processor the custom processor + */ + protected void customProcessExchange(final Exchange exchange, final Processor processor) { + GenericFile<T> file = getExchangeFileProperty(exchange); + log.trace("Custom processing file: {}", file); + + // must extract the absolute name before the begin strategy as the file could potentially be pre moved + // and then the file name would be changed + String absoluteFileName = file.getAbsoluteFilePath(); + + try { + // process using the custom processor + processor.process(exchange); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); + } + } finally { + // always remove file from the in progress list as its no longer in progress + // use the original file name that was used to add it to the repository + // as the name can be different when using preMove option + endpoint.getInProgressRepository().remove(absoluteFileName); + } + } + + /** * Strategy for validating if the given remote file should be included or not * * @param file the file Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1172738&r1=1172737&r2=1172738&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Mon Sep 19 18:33:58 2011 @@ -19,8 +19,10 @@ package org.apache.camel.component.file; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; @@ -34,6 +36,7 @@ import org.apache.camel.converter.IOConv import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.language.simple.SimpleLanguage; import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.BrowsableEndpoint; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.Language; @@ -45,9 +48,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Generic FileEndpoint + * Base class for file endpoints */ -public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint { +public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint implements BrowsableEndpoint { protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory"; protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000; @@ -133,6 +136,44 @@ public abstract class GenericFileEndpoin } /** + * This implementation will <b>not</b> load the file content. + * Any file locking is neither in use by this implementation.. + */ + @Override + public List<Exchange> getExchanges() { + final List<Exchange> answer = new ArrayList<Exchange>(); + + GenericFileConsumer consumer = null; + try { + // create a new consumer which can poll the exchanges we want to browse + // do not provide a processor as we do some custom processing + consumer = createConsumer(null); + consumer.setCustomProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + answer.add(exchange); + } + }); + // do not start scheduler, as we invoke the poll manually + consumer.setStartScheduler(false); + // start consumer + ServiceHelper.startService(consumer); + // invoke poll which performs the custom processing, so we can browse the exchanges + consumer.poll(); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + try { + ServiceHelper.stopService(consumer); + } catch (Exception e) { + log.debug("Error stopping consumer used for browsing exchanges. This exception will be ignored", e); + } + } + + return answer; + } + + /** * A strategy method to lazily create the file strategy */ @SuppressWarnings("unchecked") Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1172738&r1=1172737&r2=1172738&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Mon Sep 19 18:33:58 2011 @@ -43,6 +43,7 @@ public abstract class ScheduledPollConsu private ScheduledFuture<?> future; // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties + private boolean startScheduler = true; private long initialDelay = 1000; private long delay = 500; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; @@ -227,6 +228,21 @@ public abstract class ScheduledPollConsu this.pollStrategy = pollStrategy; } + public boolean isStartScheduler() { + return startScheduler; + } + + /** + * Sets whether the scheduler should be started when this consumer starts. + * <p/> + * This option is default true. + * + * @param startScheduler whether to start scheduler + */ + public void setStartScheduler(boolean startScheduler) { + this.startScheduler = startScheduler; + } + // Implementation methods // ------------------------------------------------------------------------- @@ -244,6 +260,12 @@ public abstract class ScheduledPollConsu ObjectHelper.notNull(executor, "executor", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); + if (isStartScheduler()) { + startScheduler(); + } + } + + protected void startScheduler() { if (isUseFixedDelay()) { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1172738&r1=1172737&r2=1172738&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Mon Sep 19 18:33:58 2011 @@ -86,19 +86,24 @@ public abstract class ScheduledPollEndpo private void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) { // special for scheduled poll consumers as we want to allow end users to configure its options // from the URI parameters without the consumer. prefix + Object startScheduler = options.remove("startScheduler"); Object initialDelay = options.remove("initialDelay"); Object delay = options.remove("delay"); Object timeUnit = options.remove("timeUnit"); Object useFixedDelay = options.remove("useFixedDelay"); Object pollStrategy = options.remove("pollStrategy"); Object runLoggingLevel = options.remove("runLoggingLevel"); - if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null || runLoggingLevel != null) { + if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null + || runLoggingLevel != null || startScheduler != null) { if (consumerProperties == null) { consumerProperties = new HashMap<String, Object>(); } if (initialDelay != null) { consumerProperties.put("initialDelay", initialDelay); } + if (startScheduler != null) { + consumerProperties.put("startScheduler", startScheduler); + } if (delay != null) { consumerProperties.put("delay", delay); }