Author: dkulp
Date: Mon Sep 19 18:33:58 2011
New Revision: 1172738

URL: http://svn.apache.org/viewvc?rev=1172738&view=rev
Log:
Merged revisions 1158341 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1158341 | davsclaus | 2011-08-16 12:00:46 -0400 (Tue, 16 Aug 2011) | 1 line
  
  CAMEL-4281: File/ftp endpoints is now BrowsableEndpoint which allows to 
browse the file system. It leverages the consumer logic, but without starting 
the scheduler.
........

Added:
    
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
      - copied unchanged from r1158341, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
    
camel/branches/camel-2.8.x/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java
      - copied unchanged from r1158341, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 18:33:58 2011
@@ -1 +1 @@
-/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159596,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164342-1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947
+/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1158341,1159171,1159174,1159326,1159457,1159460,1159596,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164342-1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1172738&r1=1172737&r2=1172738&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 Mon Sep 19 18:33:58 2011
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Base class for remote file consumers.
+ * Base class for file consumers.
  */
 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer 
implements BatchConsumer, ShutdownAware {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
@@ -49,6 +49,7 @@ public abstract class GenericFileConsume
     protected int maxMessagesPerPoll;
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
+    protected Processor customProcessor;
 
     public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor 
processor, GenericFileOperations<T> operations) {
         super(endpoint, processor);
@@ -56,6 +57,25 @@ public abstract class GenericFileConsume
         this.operations = operations;
     }
 
+    public Processor getCustomProcessor() {
+        return customProcessor;
+    }
+
+    /**
+     * Use a custom processor to process the exchange.
+     * <p/>
+     * Only set this if you need to do custom processing, instead of the 
regular processing.
+     * <p/>
+     * This is for example used to browse file endpoints by leveraging the 
file consumer to poll
+     * the directory to gather the list of exchanges. But to avoid processing 
the files regularly
+     * we can use a custom processor.
+     *
+     * @param processor a custom processor
+     */
+    public void setCustomProcessor(Processor processor) {
+        this.customProcessor = processor;
+    }
+
     /**
      * Poll for files
      */
@@ -148,7 +168,13 @@ public abstract class GenericFileConsume
             pendingExchanges = total - index - 1;
 
             // process the current exchange
-            processExchange(exchange);
+            if (customProcessor != null) {
+                // use a custom processor
+                customProcessExchange(exchange, customProcessor);
+            } else {
+                // process the exchange regular
+                processExchange(exchange);
+            }
         }
 
         // remove the file from the in progress list in case the batch was 
limited by max messages per poll
@@ -332,6 +358,35 @@ public abstract class GenericFileConsume
     }
 
     /**
+     * Processes the exchange using a custom processor.
+     *
+     * @param exchange the exchange
+     * @param processor the custom processor
+     */
+    protected void customProcessExchange(final Exchange exchange, final 
Processor processor) {
+        GenericFile<T> file = getExchangeFileProperty(exchange);
+        log.trace("Custom processing file: {}", file);
+
+        // must extract the absolute name before the begin strategy as the 
file could potentially be pre moved
+        // and then the file name would be changed
+        String absoluteFileName = file.getAbsoluteFilePath();
+
+        try {
+            // process using the custom processor
+            processor.process(exchange);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(endpoint + " error custom processing: " + file + " 
due to: " + e.getMessage() + ". This exception will be ignored.", e);
+            }
+        } finally {
+            // always remove file from the in progress list as its no longer 
in progress
+            // use the original file name that was used to add it to the 
repository
+            // as the name can be different when using preMove option
+            endpoint.getInProgressRepository().remove(absoluteFileName);
+        }
+    }
+
+    /**
      * Strategy for validating if the given remote file should be included or 
not
      *
      * @param file        the file

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1172738&r1=1172737&r2=1172738&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 Mon Sep 19 18:33:58 2011
@@ -19,8 +19,10 @@ package org.apache.camel.component.file;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
@@ -34,6 +36,7 @@ import org.apache.camel.converter.IOConv
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Language;
@@ -45,9 +48,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Generic FileEndpoint
+ * Base class for file endpoints
  */
-public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
+public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint 
implements BrowsableEndpoint {
 
     protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = 
"org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
     protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
@@ -133,6 +136,44 @@ public abstract class GenericFileEndpoin
     }
 
     /**
+     * This implementation will <b>not</b> load the file content.
+     * Any file locking is neither in use by this implementation..
+     */
+    @Override
+    public List<Exchange> getExchanges() {
+        final List<Exchange> answer = new ArrayList<Exchange>();
+
+        GenericFileConsumer consumer = null;
+        try {
+            // create a new consumer which can poll the exchanges we want to 
browse
+            // do not provide a processor as we do some custom processing
+            consumer = createConsumer(null);
+            consumer.setCustomProcessor(new Processor() {
+                @Override
+                public void process(Exchange exchange) throws Exception {
+                    answer.add(exchange);
+                }
+            });
+            // do not start scheduler, as we invoke the poll manually
+            consumer.setStartScheduler(false);
+            // start consumer
+            ServiceHelper.startService(consumer);
+            // invoke poll which performs the custom processing, so we can 
browse the exchanges
+            consumer.poll();
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            try {
+                ServiceHelper.stopService(consumer);
+            } catch (Exception e) {
+                log.debug("Error stopping consumer used for browsing 
exchanges. This exception will be ignored", e);
+            }
+        }
+
+        return answer;
+    }
+
+    /**
      * A strategy method to lazily create the file strategy
      */
     @SuppressWarnings("unchecked")

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1172738&r1=1172737&r2=1172738&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 Mon Sep 19 18:33:58 2011
@@ -43,6 +43,7 @@ public abstract class ScheduledPollConsu
     private ScheduledFuture<?> future;
 
     // if adding more options then align with 
ScheduledPollEndpoint#configureScheduledPollConsumerProperties
+    private boolean startScheduler = true;
     private long initialDelay = 1000;
     private long delay = 500;
     private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@@ -227,6 +228,21 @@ public abstract class ScheduledPollConsu
         this.pollStrategy = pollStrategy;
     }
 
+    public boolean isStartScheduler() {
+        return startScheduler;
+    }
+
+    /**
+     * Sets whether the scheduler should be started when this consumer starts.
+     * <p/>
+     * This option is default true.
+     *
+     * @param startScheduler whether to start scheduler
+     */
+    public void setStartScheduler(boolean startScheduler) {
+        this.startScheduler = startScheduler;
+    }
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
 
@@ -244,6 +260,12 @@ public abstract class ScheduledPollConsu
         ObjectHelper.notNull(executor, "executor", this);
         ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
 
+        if (isStartScheduler()) {
+            startScheduler();
+        }
+    }
+
+    protected void startScheduler() {
         if (isUseFixedDelay()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Scheduling poll (fixed delay) with initialDelay: 
{}, delay: {} ({}) for: {}",

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1172738&r1=1172737&r2=1172738&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
 Mon Sep 19 18:33:58 2011
@@ -86,19 +86,24 @@ public abstract class ScheduledPollEndpo
     private void configureScheduledPollConsumerProperties(Map<String, Object> 
options, Map<String, Object> consumerProperties) {
         // special for scheduled poll consumers as we want to allow end users 
to configure its options
         // from the URI parameters without the consumer. prefix
+        Object startScheduler = options.remove("startScheduler");
         Object initialDelay = options.remove("initialDelay");
         Object delay = options.remove("delay");
         Object timeUnit = options.remove("timeUnit");
         Object useFixedDelay = options.remove("useFixedDelay");
         Object pollStrategy = options.remove("pollStrategy");
         Object runLoggingLevel = options.remove("runLoggingLevel");
-        if (initialDelay != null || delay != null || timeUnit != null || 
useFixedDelay != null || pollStrategy != null || runLoggingLevel != null) {
+        if (initialDelay != null || delay != null || timeUnit != null || 
useFixedDelay != null || pollStrategy != null
+                || runLoggingLevel != null || startScheduler != null) {
             if (consumerProperties == null) {
                 consumerProperties = new HashMap<String, Object>();
             }
             if (initialDelay != null) {
                 consumerProperties.put("initialDelay", initialDelay);
             }
+            if (startScheduler != null) {
+                consumerProperties.put("startScheduler", startScheduler);
+            }
             if (delay != null) {
                 consumerProperties.put("delay", delay);
             }


Reply via email to