Author: davsclaus
Date: Tue Apr 16 11:33:34 2013
New Revision: 1468376

URL: http://svn.apache.org/r1468376
Log:
CAMEL-6286: Leverage async routing engine for S3 and SQS consumers. Thanks to 
Alex Hutter for the patch.

Modified:
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 Tue Apr 16 11:33:34 2013
@@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.O
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
@@ -58,7 +59,7 @@ public class S3Consumer extends Schedule
         pendingExchanges = 0;
         
         String bucketName = getConfiguration().getBucketName();
-        LOG.trace("Quering objects in bucket [{}]...", bucketName);
+        LOG.trace("Queueing objects in bucket [{}]...", bucketName);
         
         ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
         listObjectsRequest.setBucketName(bucketName);
@@ -66,15 +67,19 @@ public class S3Consumer extends Schedule
         listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
         
         ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
-        
-        LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+        }
         
         Queue<Exchange> exchanges = 
createExchanges(listObjects.getObjectSummaries());
         return processBatch(CastUtils.cast(exchanges));
     }
     
     protected Queue<Exchange> createExchanges(List<S3ObjectSummary> 
s3ObjectSummaries) {
-        LOG.trace("Received {} messages in this poll", 
s3ObjectSummaries.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", 
s3ObjectSummaries.size());
+        }
         
         Queue<Exchange> answer = new LinkedList<Exchange>();
         for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
@@ -91,7 +96,7 @@ public class S3Consumer extends Schedule
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
             // only loop if we are started (allowed to run)
-            Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
             // add current index and total as properties
             exchange.setProperty(Exchange.BATCH_INDEX, index);
             exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -117,8 +122,12 @@ public class S3Consumer extends Schedule
             });
 
             LOG.trace("Processing exchange [{}]...", exchange);
-
-            getProcessor().process(exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
         }
 
         return total;
@@ -138,12 +147,11 @@ public class S3Consumer extends Schedule
                 LOG.trace("Deleting object from bucket {} with key {}...", 
bucketName, key);
                 
                 getAmazonS3Client().deleteObject(bucketName, key);
-                
-                LOG.trace("Object deleted");
+
+                LOG.trace("Deleted object from bucket {} with key {}...", 
bucketName, key);
             }
         } catch (AmazonClientException e) {
-            LOG.warn("Error occurred during deleting object", e);
-            exchange.setException(e);
+            getExceptionHandler().handleException("Error occurred during 
deleting object. This exception is ignored.", exchange, e);
         }
     }
 

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 Tue Apr 16 11:33:34 2013
@@ -33,6 +33,7 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
@@ -74,15 +75,19 @@ public class SqsConsumer extends Schedul
         LOG.trace("Receiving messages with request [{}]...", request);
         
         ReceiveMessageResult messageResult = 
getClient().receiveMessage(request);
-        
-        LOG.trace("Received {} messages", messageResult.getMessages().size());
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages", 
messageResult.getMessages().size());
+        }
         
         Queue<Exchange> exchanges = 
createExchanges(messageResult.getMessages());
         return processBatch(CastUtils.cast(exchanges));
     }
     
     protected Queue<Exchange> createExchanges(List<Message> messages) {
-        LOG.trace("Received {} messages in this poll", messages.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", messages.size());
+        }
         
         Queue<Exchange> answer = new LinkedList<Exchange>();
         for (Message message : messages) {
@@ -98,7 +103,7 @@ public class SqsConsumer extends Schedul
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
             // only loop if we are started (allowed to run)
-            Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
             // add current index and total as properties
             exchange.setProperty(Exchange.BATCH_INDEX, index);
             exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -112,11 +117,11 @@ public class SqsConsumer extends Schedul
             if (this.scheduledExecutor != null && visibilityTimeout != null && 
(visibilityTimeout.intValue() / 2) > 0) {
                 int delay = visibilityTimeout.intValue() / 2;
                 int period = visibilityTimeout.intValue();
-                LOG.debug("Scheduled TimeoutExtender task to start after {} 
delay, and run with {} period (seconds) to extend exchangeId: {}",
-                        new Object[]{delay, period, exchange.getExchangeId()});
-                int repeatSeconds = new Double(visibilityTimeout.doubleValue() 
* 1.5).intValue();   //
-                LOG.debug("period :" + period);
-                LOG.debug("repeatSeconds :" + repeatSeconds);
+                int repeatSeconds = new Double(visibilityTimeout.doubleValue() 
* 1.5).intValue();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scheduled TimeoutExtender task to start after 
{} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
+                            new Object[]{delay, period, repeatSeconds, 
exchange.getExchangeId()});
+                }
                 final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(
                         new TimeoutExtender(exchange, repeatSeconds), delay, 
period, TimeUnit.SECONDS);
                 exchange.addOnCompletion(new Synchronization() {
@@ -157,12 +162,12 @@ public class SqsConsumer extends Schedul
 
 
             LOG.trace("Processing exchange [{}]...", exchange);
-            try {
-                // This blocks while message is consumed.
-                getProcessor().process(exchange);
-            } finally {
-                LOG.trace("Processing exchange [{}] done.", exchange);
-            }
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
         }
 
         return total;
@@ -183,10 +188,10 @@ public class SqsConsumer extends Schedul
                 
                 getClient().deleteMessage(deleteRequest);
 
-                LOG.trace("Message deleted");
+                LOG.trace("Deleted message with receipt handle {}...", 
receiptHandle);
             }
         } catch (AmazonClientException e) {
-            getExceptionHandler().handleException("Error occurred during 
deleting message.", e);
+            getExceptionHandler().handleException("Error occurred during 
deleting message. This exception is ignored.", exchange, e);
         }
     }
 


Reply via email to