Author: davsclaus Date: Sun Apr 22 08:43:02 2012 New Revision: 1328817 URL: http://svn.apache.org/viewvc?rev=1328817&view=rev Log: CAMEL-5202: Added option eagerLimitMaxMessagesPerPoll to file/ftp endpoints.
Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java - copied unchanged from r1328814, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java camel/branches/camel-2.9.x/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java - copied unchanged from r1328814, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1328814 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java Sun Apr 22 08:43:02 2012 @@ -84,6 +84,7 @@ public class FileEndpoint extends Generi // set max messages per poll result.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); + result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll()); configureConsumer(result); return result; Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1328817&r1=1328816&r2=1328817&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sun Apr 22 08:43:02 2012 @@ -18,6 +18,7 @@ package org.apache.camel.component.file; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -49,6 +50,7 @@ public abstract class GenericFileConsume protected volatile ShutdownRunningTask shutdownRunningTask; protected volatile int pendingExchanges; protected Processor customProcessor; + protected boolean eagerLimitMaxMessagesPerPoll = true; public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { super(endpoint, processor); @@ -75,9 +77,18 @@ public abstract class GenericFileConsume this.customProcessor = processor; } + public boolean isEagerLimitMaxMessagesPerPoll() { + return eagerLimitMaxMessagesPerPoll; + } + + public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { + this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; + } + /** * Poll for files */ + @SuppressWarnings("unchecked") protected int poll() throws Exception { // must reset for each poll fileExpressionResult = null; @@ -114,6 +125,7 @@ public abstract class GenericFileConsume } // sort using build in sorters so we can use expressions + // use a linked list so we can deque the exchanges LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); for (GenericFile<T> file : files) { Exchange exchange = endpoint.createExchange(file); @@ -126,13 +138,24 @@ public abstract class GenericFileConsume Collections.sort(exchanges, endpoint.getSortBy()); } + // use a queue for the exchanges + Deque<Exchange> q = exchanges; + + // we are not eager limiting, but we have configured a limit, so cut the list of files + if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { + if (files.size() > maxMessagesPerPoll) { + log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); + // must first remove excessive files from the in progress repository + removeExcessiveInProgressFiles(q, maxMessagesPerPoll); + } + } + // consume files one by one int total = exchanges.size(); if (total > 0) { log.debug("Total {} files to consume", total); } - Queue<Exchange> q = exchanges; int polledMessages = processBatch(CastUtils.cast(q)); postPollCheck(); @@ -176,15 +199,22 @@ public abstract class GenericFileConsume } } + // drain any in progress files as we are done with this batch + removeExcessiveInProgressFiles((Deque) exchanges, 0); + + return total; + } + + @SuppressWarnings("unchecked") + protected void removeExcessiveInProgressFiles(Deque exchanges, int limit) { // remove the file from the in progress list in case the batch was limited by max messages per poll - while (exchanges.size() > 0) { - Exchange exchange = (Exchange) exchanges.poll(); + while (exchanges.size() > limit) { + // must remove last + Exchange exchange = (Exchange) exchanges.removeLast(); GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); String key = file.getAbsoluteFilePath(); endpoint.getInProgressRepository().remove(key); } - - return total; } public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { @@ -242,6 +272,11 @@ public abstract class GenericFileConsume * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit */ public boolean canPollMoreFiles(List<?> fileList) { + // at this point we should not limit if we are not eager + if (!eagerLimitMaxMessagesPerPoll) { + return true; + } + if (maxMessagesPerPoll <= 0) { // no limitation return true; Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Sun Apr 22 08:43:02 2012 @@ -71,6 +71,7 @@ public abstract class GenericFileEndpoin protected boolean delete; protected boolean flatten; protected int maxMessagesPerPoll; + protected boolean eagerMaxMessagesPerPoll = true; protected int maxDepth = Integer.MAX_VALUE; protected int minDepth; protected String tempPrefix; @@ -556,6 +557,14 @@ public abstract class GenericFileEndpoin this.maxMessagesPerPoll = maxMessagesPerPoll; } + public boolean isEagerMaxMessagesPerPoll() { + return eagerMaxMessagesPerPoll; + } + + public void setEagerMaxMessagesPerPoll(boolean eagerMaxMessagesPerPoll) { + this.eagerMaxMessagesPerPoll = eagerMaxMessagesPerPoll; + } + public int getMaxDepth() { return maxDepth; } @@ -565,7 +574,6 @@ public abstract class GenericFileEndpoin } public int getMinDepth() { - return minDepth; } Modified: camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java (original) +++ camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java Sun Apr 22 08:43:02 2012 @@ -92,6 +92,7 @@ public abstract class RemoteFileEndpoint // set max messages per poll consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); + consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll()); configureConsumer(consumer); return consumer;