Repository: camel Updated Branches: refs/heads/camel-2.15.x f233f53b8 -> 7624bc4cb refs/heads/camel-2.16.x 62b4310a8 -> e56c64e1f
CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e56c64e1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e56c64e1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e56c64e1 Branch: refs/heads/camel-2.16.x Commit: e56c64e1ff4693bfffc8867c687ada83e3444a2f Parents: 62b4310 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Sun Oct 11 23:19:00 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Sun Oct 11 23:22:11 2015 +0800 ---------------------------------------------------------------------- .../camel/component/aws/s3/S3Consumer.java | 71 ++++++++++++-------- 1 file changed, 42 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e56c64e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index 2f355f9..7de2b9d 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class); private String marker; + private boolean filesConsumed; public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { super(endpoint, processor); @@ -63,37 +64,49 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { String fileName = getConfiguration().getFileName(); String bucketName = getConfiguration().getBucketName(); Queue<Exchange> exchanges; - - if (fileName != null) { - LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); - - S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); - exchanges = createExchanges(s3Object); - } else { - LOG.trace("Queueing objects in bucket [{}]...", bucketName); - - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(bucketName); - listObjectsRequest.setPrefix(getConfiguration().getPrefix()); - if (maxMessagesPerPoll > 0) { - listObjectsRequest.setMaxKeys(maxMessagesPerPoll); - } - if (marker != null && !getConfiguration().isDeleteAfterRead()) { - listObjectsRequest.setMarker(marker); - } - ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); - // we only setup the marker if the file is not deleted - if (!getConfiguration().isDeleteAfterRead()) { - // where marker is track - marker = listObjects.getMarker(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + if (filesConsumed) { + exchanges = new LinkedList<Exchange>(); + } else { + if (fileName != null) { + LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); + + S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); + exchanges = createExchanges(s3Object); + if (!getConfiguration().isDeleteAfterRead()) { + filesConsumed = true; + } + } else { + LOG.trace("Queueing objects in bucket [{}]...", bucketName); + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(getConfiguration().getPrefix()); + if (maxMessagesPerPoll > 0) { + listObjectsRequest.setMaxKeys(maxMessagesPerPoll); + } + if (marker != null && !getConfiguration().isDeleteAfterRead()) { + listObjectsRequest.setMarker(marker); + } + + ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); + // we only setup the marker if the file is not deleted + if (!getConfiguration().isDeleteAfterRead()) { + // if the marker is truncated, the nextMarker should not be null + if (listObjects.getNextMarker() != null) { + marker = listObjects.getNextMarker(); + } else { + // if there is no marker, the files are consumed, we should not pull it again + filesConsumed = true; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + } + + exchanges = createExchanges(listObjects.getObjectSummaries()); } - - exchanges = createExchanges(listObjects.getObjectSummaries()); - } + } return processBatch(CastUtils.cast(exchanges)); }