Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x f233f53b8 -> 7624bc4cb
  refs/heads/camel-2.16.x 62b4310a8 -> e56c64e1f


CAMEL-8431 Fixed the issue that camel keeps consuming the same file when 
deleteAfterRead is false


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e56c64e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e56c64e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e56c64e1

Branch: refs/heads/camel-2.16.x
Commit: e56c64e1ff4693bfffc8867c687ada83e3444a2f
Parents: 62b4310
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Sun Oct 11 23:22:11 2015 +0800

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      | 71 ++++++++++++--------
 1 file changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e56c64e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 2f355f9..7de2b9d 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer 
{
     
     private static final Logger LOG = 
LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
+    private boolean filesConsumed;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
         super(endpoint, processor);
@@ -63,37 +64,49 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
-
-        if (fileName != null) {
-            LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
-
-            S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
-            exchanges = createExchanges(s3Object);
-        } else {
-            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-        
-            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-            listObjectsRequest.setBucketName(bucketName);
-            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-            if (maxMessagesPerPoll > 0) {
-                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-            }
-            if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                listObjectsRequest.setMarker(marker);
-            }
         
-            ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
-            // we only setup the marker if the file is not deleted
-            if (!getConfiguration().isDeleteAfterRead()) {
-                // where marker is track
-                marker = listObjects.getMarker();
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+        if (filesConsumed) {
+            exchanges = new LinkedList<Exchange>();
+        } else {
+            if (fileName != null) {
+                LOG.trace("Getting object in bucket [{}] with file name 
[{}]...", bucketName, fileName);
+    
+                S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
+                exchanges = createExchanges(s3Object);
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    filesConsumed = true;
+                }
+            } else {
+                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+            
+                ListObjectsRequest listObjectsRequest = new 
ListObjectsRequest();
+                listObjectsRequest.setBucketName(bucketName);
+                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+                if (maxMessagesPerPoll > 0) {
+                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+                }
+                if (marker != null && !getConfiguration().isDeleteAfterRead()) 
{
+                    listObjectsRequest.setMarker(marker);
+                }
+            
+                ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
+                // we only setup the marker if the file is not deleted
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    // if the marker is truncated, the nextMarker should not 
be null
+                    if (listObjects.getNextMarker() != null) {
+                        marker = listObjects.getNextMarker();
+                    } else {
+                        // if there is no marker, the files are consumed, we 
should not pull it again
+                        filesConsumed = true;
+                    }
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+                }
+            
+                exchanges = createExchanges(listObjects.getObjectSummaries());
             }
-        
-            exchanges = createExchanges(listObjects.getObjectSummaries());
-        }
+        }    
         return processBatch(CastUtils.cast(exchanges));
     }
     

Reply via email to