Author: davsclaus
Date: Sun Apr 22 08:43:02 2012
New Revision: 1328817

URL: http://svn.apache.org/viewvc?rev=1328817&view=rev
Log:
CAMEL-5202: Added option eagerLimitMaxMessagesPerPoll to file/ftp endpoints.

Added:
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
      - copied unchanged from r1328814, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
    
camel/branches/camel-2.9.x/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
      - copied unchanged from r1328814, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1328814

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 Sun Apr 22 08:43:02 2012
@@ -84,6 +84,7 @@ public class FileEndpoint extends Generi
 
         // set max messages per poll
         result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
 
         configureConsumer(result);
         return result;

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 Sun Apr 22 08:43:02 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.file;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -49,6 +50,7 @@ public abstract class GenericFileConsume
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
     protected Processor customProcessor;
+    protected boolean eagerLimitMaxMessagesPerPoll = true;
 
     public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor 
processor, GenericFileOperations<T> operations) {
         super(endpoint, processor);
@@ -75,9 +77,18 @@ public abstract class GenericFileConsume
         this.customProcessor = processor;
     }
 
+    public boolean isEagerLimitMaxMessagesPerPoll() {
+        return eagerLimitMaxMessagesPerPoll;
+    }
+
+    public void setEagerLimitMaxMessagesPerPoll(boolean 
eagerLimitMaxMessagesPerPoll) {
+        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
+    }
+
     /**
      * Poll for files
      */
+    @SuppressWarnings("unchecked")
     protected int poll() throws Exception {
         // must reset for each poll
         fileExpressionResult = null;
@@ -114,6 +125,7 @@ public abstract class GenericFileConsume
         }
 
         // sort using build in sorters so we can use expressions
+        // use a linked list so we can deque the exchanges
         LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
         for (GenericFile<T> file : files) {
             Exchange exchange = endpoint.createExchange(file);
@@ -126,13 +138,24 @@ public abstract class GenericFileConsume
             Collections.sort(exchanges, endpoint.getSortBy());
         }
 
+        // use a queue for the exchanges
+        Deque<Exchange> q = exchanges;
+
+        // we are not eager limiting, but we have configured a limit, so cut 
the list of files
+        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
+            if (files.size() > maxMessagesPerPoll) {
+                log.debug("Limiting maximum messages to poll at {} files as 
there was more messages in this poll.", maxMessagesPerPoll);
+                // must first remove excessive files from the in progress 
repository
+                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
+            }
+        }
+
         // consume files one by one
         int total = exchanges.size();
         if (total > 0) {
             log.debug("Total {} files to consume", total);
         }
 
-        Queue<Exchange> q = exchanges;
         int polledMessages = processBatch(CastUtils.cast(q));
 
         postPollCheck();
@@ -176,15 +199,22 @@ public abstract class GenericFileConsume
             }
         }
 
+        // drain any in progress files as we are done with this batch
+        removeExcessiveInProgressFiles((Deque) exchanges, 0);
+
+        return total;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void removeExcessiveInProgressFiles(Deque exchanges, int limit) {
         // remove the file from the in progress list in case the batch was 
limited by max messages per poll
-        while (exchanges.size() > 0) {
-            Exchange exchange = (Exchange) exchanges.poll();
+        while (exchanges.size() > limit) {
+            // must remove last
+            Exchange exchange = (Exchange) exchanges.removeLast();
             GenericFile<T> file = (GenericFile<T>) 
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
         }
-
-        return total;
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
@@ -242,6 +272,11 @@ public abstract class GenericFileConsume
      * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting 
maxMessagesPerPoll limit
      */
     public boolean canPollMoreFiles(List<?> fileList) {
+        // at this point we should not limit if we are not eager
+        if (!eagerLimitMaxMessagesPerPoll) {
+            return true;
+        }
+
         if (maxMessagesPerPoll <= 0) {
             // no limitation
             return true;

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 Sun Apr 22 08:43:02 2012
@@ -71,6 +71,7 @@ public abstract class GenericFileEndpoin
     protected boolean delete;
     protected boolean flatten;
     protected int maxMessagesPerPoll;
+    protected boolean eagerMaxMessagesPerPoll = true;
     protected int maxDepth = Integer.MAX_VALUE;
     protected int minDepth;
     protected String tempPrefix;
@@ -556,6 +557,14 @@ public abstract class GenericFileEndpoin
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
+    public boolean isEagerMaxMessagesPerPoll() {
+        return eagerMaxMessagesPerPoll;
+    }
+
+    public void setEagerMaxMessagesPerPoll(boolean eagerMaxMessagesPerPoll) {
+        this.eagerMaxMessagesPerPoll = eagerMaxMessagesPerPoll;
+    }
+
     public int getMaxDepth() {
         return maxDepth;
     }
@@ -565,7 +574,6 @@ public abstract class GenericFileEndpoin
     }
 
     public int getMinDepth() {
-
         return minDepth;
     }
 

Modified: 
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 Sun Apr 22 08:43:02 2012
@@ -92,6 +92,7 @@ public abstract class RemoteFileEndpoint
 
         // set max messages per poll
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
 
         configureConsumer(consumer);
         return consumer;


Reply via email to