Author: davsclaus Date: Tue Apr 16 11:33:34 2013 New Revision: 1468376 URL: http://svn.apache.org/r1468376 Log: CAMEL-6286: Leverage async routing engine for S3 and SQS consumers. Thanks to Alex Hutter for the patch.
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Tue Apr 16 11:33:34 2013 @@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.O import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; @@ -58,7 +59,7 @@ public class S3Consumer extends Schedule pendingExchanges = 0; String bucketName = getConfiguration().getBucketName(); - LOG.trace("Quering objects in bucket [{}]...", bucketName); + LOG.trace("Queueing objects in bucket [{}]...", bucketName); ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); listObjectsRequest.setBucketName(bucketName); @@ -66,15 +67,19 @@ public class S3Consumer extends Schedule listObjectsRequest.setMaxKeys(maxMessagesPerPoll); ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); - - LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + } Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries()); return processBatch(CastUtils.cast(exchanges)); } protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) { - LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size()); + } Queue<Exchange> answer = new LinkedList<Exchange>(); for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) { @@ -91,7 +96,7 @@ public class S3Consumer extends Schedule for (int index = 0; index < total && isBatchAllowed(); index++) { // only loop if we are started (allowed to run) - Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); // add current index and total as properties exchange.setProperty(Exchange.BATCH_INDEX, index); exchange.setProperty(Exchange.BATCH_SIZE, total); @@ -117,8 +122,12 @@ public class S3Consumer extends Schedule }); LOG.trace("Processing exchange [{}]...", exchange); - - getProcessor().process(exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Processing exchange [{}] done.", exchange); + } + }); } return total; @@ -138,12 +147,11 @@ public class S3Consumer extends Schedule LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key); getAmazonS3Client().deleteObject(bucketName, key); - - LOG.trace("Object deleted"); + + LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); } } catch (AmazonClientException e) { - LOG.warn("Error occurred during deleting object", e); - exchange.setException(e); + getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e); } } Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Tue Apr 16 11:33:34 2013 @@ -33,6 +33,7 @@ import com.amazonaws.services.sqs.model. import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; @@ -74,15 +75,19 @@ public class SqsConsumer extends Schedul LOG.trace("Receiving messages with request [{}]...", request); ReceiveMessageResult messageResult = getClient().receiveMessage(request); - - LOG.trace("Received {} messages", messageResult.getMessages().size()); + + if (LOG.isTraceEnabled()) { + LOG.trace("Received {} messages", messageResult.getMessages().size()); + } Queue<Exchange> exchanges = createExchanges(messageResult.getMessages()); return processBatch(CastUtils.cast(exchanges)); } protected Queue<Exchange> createExchanges(List<Message> messages) { - LOG.trace("Received {} messages in this poll", messages.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Received {} messages in this poll", messages.size()); + } Queue<Exchange> answer = new LinkedList<Exchange>(); for (Message message : messages) { @@ -98,7 +103,7 @@ public class SqsConsumer extends Schedul for (int index = 0; index < total && isBatchAllowed(); index++) { // only loop if we are started (allowed to run) - Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); // add current index and total as properties exchange.setProperty(Exchange.BATCH_INDEX, index); exchange.setProperty(Exchange.BATCH_SIZE, total); @@ -112,11 +117,11 @@ public class SqsConsumer extends Schedul if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) { int delay = visibilityTimeout.intValue() / 2; int period = visibilityTimeout.intValue(); - LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {} period (seconds) to extend exchangeId: {}", - new Object[]{delay, period, exchange.getExchangeId()}); - int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue(); // - LOG.debug("period :" + period); - LOG.debug("repeatSeconds :" + repeatSeconds); + int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", + new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()}); + } final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate( new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); exchange.addOnCompletion(new Synchronization() { @@ -157,12 +162,12 @@ public class SqsConsumer extends Schedul LOG.trace("Processing exchange [{}]...", exchange); - try { - // This blocks while message is consumed. - getProcessor().process(exchange); - } finally { - LOG.trace("Processing exchange [{}] done.", exchange); - } + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Processing exchange [{}] done.", exchange); + } + }); } return total; @@ -183,10 +188,10 @@ public class SqsConsumer extends Schedul getClient().deleteMessage(deleteRequest); - LOG.trace("Message deleted"); + LOG.trace("Deleted message with receipt handle {}...", receiptHandle); } } catch (AmazonClientException e) { - getExceptionHandler().handleException("Error occurred during deleting message.", e); + getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e); } }