Author: davsclaus
Date: Sat Jan 14 17:25:26 2012
New Revision: 1231533
URL: http://svn.apache.org/viewvc?rev=1231533&view=rev
Log:
CAMEL-4577: Added a ScheduledBatchPollingConsumer to reuse logic. Thanks to
Bilgin for the patch.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Sat Jan 14 17:25:26 2012
@@ -23,12 +23,10 @@ import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
@@ -39,13 +37,12 @@ import org.slf4j.LoggerFactory;
/**
* Base class for file consumers.
*/
-public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer
implements BatchConsumer, ShutdownAware {
+public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsumer {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
protected boolean loggedIn;
protected String fileExpressionResult;
- protected int maxMessagesPerPoll;
protected volatile ShutdownRunningTask shutdownRunningTask;
protected volatile int pendingExchanges;
protected Processor customProcessor;
@@ -140,9 +137,6 @@ public abstract class GenericFileConsume
return polledMessages;
}
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
@SuppressWarnings("unchecked")
public int processBatch(Queue<Object> exchanges) {
@@ -187,53 +181,6 @@ public abstract class GenericFileConsume
return total;
}
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
/**
* Whether or not we can continue polling for more files
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java?rev=1231533&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
Sat Jan 14 17:25:26 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ShutdownAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A useful base class for any consumer which is polling batch based
+ */
+public abstract class ScheduledBatchPollingConsumer extends
ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
+ private static final transient Logger log =
LoggerFactory.getLogger(ScheduledBatchPollingConsumer.class);
+ protected volatile ShutdownRunningTask shutdownRunningTask;
+ protected volatile int pendingExchanges;
+ protected int maxMessagesPerPoll;
+
+ public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor
processor) {
+ super(endpoint, processor);
+ }
+
+ public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor
processor, ScheduledExecutorService executor) {
+ super(endpoint, processor, executor);
+ }
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ // store a reference what to do in case when shutting down and we have
pending messages
+ this.shutdownRunningTask = shutdownRunningTask;
+ // do not defer shutdown
+ return false;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ int answer;
+ // only return the real pending size in case we are configured to
complete all tasks
+ if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
+ answer = pendingExchanges;
+ } else {
+ answer = 0;
+ }
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there
is a little gap
+ // in the processBatch method and until an exchange gets enlisted
as in-flight
+ // which happens later, so we need to signal back to the shutdown
strategy that
+ // there is a pending exchange. When we are no longer polling,
then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
+ }
+
+ @Override
+ public void prepareShutdown() {
+ // reset task as the state of the task is not to be preserved
+ // which otherwise may cause isBatchAllowed() to return a wrong answer
+ this.shutdownRunningTask = null;
+ }
+
+ @Override
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
+
+ /**
+ * Gets the maximum number of messages as a limit to poll at each polling.
+ * <p/>
+ * Is default unlimited, but use 0 or negative number to disable it as
unlimited.
+ *
+ * @return max messages to poll
+ */
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+
+ @Override
+ public boolean isBatchAllowed() {
+ // stop if we are not running
+ boolean answer = isRunAllowed();
+ if (!answer) {
+ return false;
+ }
+
+ if (shutdownRunningTask == null) {
+ // we are not shutting down so continue to run
+ return true;
+ }
+
+ // we are shutting down so only continue if we are configured to
complete all tasks
+ return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
+ }
+}
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
(original)
+++
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
Sat Jan 14 17:25:26 2012
@@ -27,13 +27,10 @@ import com.amazonaws.services.s3.model.O
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -46,12 +43,9 @@ import org.slf4j.LoggerFactory;
* <a href="http://aws.amazon.com/s3/">AWS S3</a>
*
*/
-public class S3Consumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class S3Consumer extends ScheduledBatchPollingConsumer {
private static final transient Logger LOG =
LoggerFactory.getLogger(S3Consumer.class);
-
- private volatile ShutdownRunningTask shutdownRunningTask;
- private volatile int pendingExchanges;
public S3Consumer(S3Endpoint endpoint, Processor processor) throws
NoFactoryAvailableException {
super(endpoint, processor);
@@ -68,7 +62,7 @@ public class S3Consumer extends Schedule
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(bucketName);
- listObjectsRequest.setMaxKeys(getMaxMessagesPerPoll());
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
ObjectListing listObjects =
getAmazonS3Client().listObjects(listObjectsRequest);
@@ -165,55 +159,7 @@ public class S3Consumer extends Schedule
LOG.warn("Exchange failed, so rolling back message status: {}",
exchange);
}
}
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
protected S3Configuration getConfiguration() {
return getEndpoint().getConfiguration();
}
@@ -226,15 +172,7 @@ public class S3Consumer extends Schedule
public S3Endpoint getEndpoint() {
return (S3Endpoint) super.getEndpoint();
}
-
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
- }
-
- public int getMaxMessagesPerPoll() {
- return getEndpoint().getMaxMessagesPerPoll();
- }
-
+
@Override
public String toString() {
return "S3Consumer[" +
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
(original)
+++
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
Sat Jan 14 17:25:26 2012
@@ -49,7 +49,7 @@ public class S3Endpoint extends Schedule
private AmazonS3Client s3Client;
private S3Configuration configuration;
private int maxMessagesPerPoll = 10;
-
+
@Deprecated
public S3Endpoint(String uri, CamelContext context, S3Configuration
configuration) {
super(uri, context);
@@ -63,6 +63,7 @@ public class S3Endpoint extends Schedule
public Consumer createConsumer(Processor processor) throws Exception {
S3Consumer s3Consumer = new S3Consumer(this, processor);
configureConsumer(s3Consumer);
+ s3Consumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
return s3Consumer;
}
@@ -178,7 +179,7 @@ public class S3Endpoint extends Schedule
}
return client;
}
-
+
public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
(original)
+++
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
Sat Jan 14 17:25:26 2012
@@ -27,13 +27,10 @@ import com.amazonaws.services.sqs.model.
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -47,12 +44,9 @@ import org.slf4j.LoggerFactory;
* <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
*
*/
-public class SqsConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class SqsConsumer extends ScheduledBatchPollingConsumer {
private static final transient Logger LOG =
LoggerFactory.getLogger(SqsConsumer.class);
-
- private volatile ShutdownRunningTask shutdownRunningTask;
- private volatile int pendingExchanges;
public SqsConsumer(SqsEndpoint endpoint, Processor processor) throws
NoFactoryAvailableException {
super(endpoint, processor);
@@ -143,7 +137,7 @@ public class SqsConsumer extends Schedul
LOG.trace("Deleting message with receipt handle {}...",
receiptHandle);
getClient().deleteMessage(deleteRequest);
-
+
LOG.trace("Message deleted");
}
} catch (AmazonClientException e) {
@@ -165,55 +159,7 @@ public class SqsConsumer extends Schedul
LOG.warn("Exchange failed, so rolling back message status: {}",
exchange);
}
}
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
protected SqsConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
@@ -230,15 +176,7 @@ public class SqsConsumer extends Schedul
public SqsEndpoint getEndpoint() {
return (SqsEndpoint) super.getEndpoint();
}
-
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
- }
-
- public int getMaxMessagesPerPoll() {
- return getEndpoint().getMaxMessagesPerPoll();
- }
-
+
@Override
public String toString() {
return "SqsConsumer[" +
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
(original)
+++
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
Sat Jan 14 17:25:26 2012
@@ -63,6 +63,7 @@ public class SqsEndpoint extends Schedul
public Consumer createConsumer(Processor processor) throws Exception {
SqsConsumer sqsConsumer = new SqsConsumer(this, processor);
configureConsumer(sqsConsumer);
+ sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
return sqsConsumer;
}
Modified:
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
(original)
+++
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
Sat Jan 14 17:25:26 2012
@@ -20,14 +20,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -38,7 +35,7 @@ import org.slf4j.LoggerFactory;
*
* @see org.apache.camel.component.ibatis.strategy.IBatisProcessingStrategy
*/
-public class IBatisConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class IBatisConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG =
LoggerFactory.getLogger(IBatisConsumer.class);
@@ -49,9 +46,6 @@ public class IBatisConsumer extends Sche
}
}
- protected volatile ShutdownRunningTask shutdownRunningTask;
- protected volatile int pendingExchanges;
-
/**
* Statement to run after data has been processed in the route
*/
@@ -116,10 +110,6 @@ public class IBatisConsumer extends Sche
return processBatch(CastUtils.cast(answer));
}
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
-
public int processBatch(Queue<Object> exchanges) throws Exception {
final IBatisEndpoint endpoint = getEndpoint();
@@ -161,54 +151,6 @@ public class IBatisConsumer extends Sche
return total;
}
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
private Exchange createExchange(Object data) {
final IBatisEndpoint endpoint = getEndpoint();
final Exchange exchange =
endpoint.createExchange(ExchangePattern.InOnly);
Modified:
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
(original)
+++
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
Sat Jan 14 17:25:26 2012
@@ -23,6 +23,8 @@ import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CastUtils;
@@ -33,21 +35,14 @@ import org.jclouds.blobstore.options.Lis
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JcloudsBlobStoreConsumer extends JcloudsConsumer implements
BatchConsumer, ShutdownAware {
+public class JcloudsBlobStoreConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG =
LoggerFactory.getLogger(JcloudsBlobStoreConsumer.class);
-
private final JcloudsBlobStoreEndpoint endpoint;
-
private final String container;
private final BlobStore blobStore;
-
private int maxMessagesPerPoll = 10;
- private volatile ShutdownRunningTask shutdownRunningTask;
- private volatile int pendingExchanges;
-
-
public JcloudsBlobStoreConsumer(JcloudsBlobStoreEndpoint endpoint,
Processor processor, BlobStore blobStore) {
super(endpoint, processor);
this.blobStore = blobStore;
@@ -75,11 +70,6 @@ public class JcloudsBlobStoreConsumer ex
return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
}
- @Override
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
-
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -112,53 +102,4 @@ public class JcloudsBlobStoreConsumer ex
return total;
}
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- @Override
- public void prepareShutdown() {
- //Empty method
- }
}
Modified:
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
(original)
+++
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Sat Jan 14 17:25:26 2012
@@ -27,12 +27,10 @@ import javax.persistence.LockModeType;
import javax.persistence.PersistenceException;
import javax.persistence.Query;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -42,7 +40,7 @@ import org.springframework.orm.jpa.JpaCa
/**
* @version
*/
-public class JpaConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class JpaConsumer extends ScheduledBatchPollingConsumer {
private static final transient Logger LOG =
LoggerFactory.getLogger(JpaConsumer.class);
private final JpaEndpoint endpoint;
@@ -53,10 +51,7 @@ public class JpaConsumer extends Schedul
private String namedQuery;
private String nativeQuery;
private Class<?> resultClass;
- private int maxMessagesPerPoll;
private boolean transacted;
- private volatile ShutdownRunningTask shutdownRunningTask;
- private volatile int pendingExchanges;
private static final class DataHolder {
private Exchange exchange;
@@ -128,9 +123,6 @@ public class JpaConsumer extends Schedul
return
endpoint.getCamelContext().getTypeConverter().convertTo(int.class,
messagePolled);
}
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -172,54 +164,6 @@ public class JpaConsumer extends Schedul
return total;
}
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
// Properties
//
-------------------------------------------------------------------------
@Override
Modified:
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
(original)
+++
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
Sat Jan 14 17:25:26 2012
@@ -19,13 +19,11 @@ package org.apache.camel.component.krati
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
+
import krati.store.DataStore;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -36,7 +34,7 @@ import org.slf4j.LoggerFactory;
/**
* The Krati consumer.
*/
-public class KratiConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class KratiConsumer extends ScheduledBatchPollingConsumer {
private static final transient Logger LOG =
LoggerFactory.getLogger(KratiConsumer.class);
@@ -44,9 +42,6 @@ public class KratiConsumer extends Sched
protected DataStore<Object, Object> dataStore;
protected int maxMessagesPerPoll = 10;
- protected volatile ShutdownRunningTask shutdownRunningTask;
- protected volatile int pendingExchanges;
-
public KratiConsumer(KratiEndpoint endpoint, Processor processor,
DataStore<Object, Object> dataStore) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -72,21 +67,6 @@ public class KratiConsumer extends Sched
return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
}
- /**
- * Sets a maximum number of messages as a limit to poll at each polling.
- * <p/>
- * Can be used to limit eg to 100 to avoid when starting and there are
millions
- * of messages for you in the first poll.
- * <p/>
- * Default value is 10.
- *
- * @param maxMessagesPerPoll maximum messages to poll.
- */
- @Override
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
-
@Override
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -123,55 +103,4 @@ public class KratiConsumer extends Sched
return total;
}
-
- @Override
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
- @Override
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- @Override
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- @Override
- public void prepareShutdown() {
- }
}
Modified:
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
(original)
+++
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
Sat Jan 14 17:25:26 2012
@@ -29,12 +29,9 @@ import javax.mail.MessagingException;
import javax.mail.Store;
import javax.mail.search.FlagTerm;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -45,7 +42,7 @@ import org.slf4j.LoggerFactory;
* A {@link org.apache.camel.Consumer Consumer} which consumes messages from
JavaMail using a
* {@link javax.mail.Transport Transport} and dispatches them to the {@link
Processor}
*/
-public class MailConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class MailConsumer extends ScheduledBatchPollingConsumer {
public static final String POP3_UID = "CamelPop3Uid";
public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
private static final transient Logger LOG =
LoggerFactory.getLogger(MailConsumer.class);
@@ -53,9 +50,6 @@ public class MailConsumer extends Schedu
private final JavaMailSender sender;
private Folder folder;
private Store store;
- private int maxMessagesPerPoll;
- private volatile ShutdownRunningTask shutdownRunningTask;
- private volatile int pendingExchanges;
public MailConsumer(MailEndpoint endpoint, Processor processor,
JavaMailSender sender) {
super(endpoint, processor);
@@ -152,10 +146,6 @@ public class MailConsumer extends Schedu
return polledMessages;
}
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
-
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -202,54 +192,6 @@ public class MailConsumer extends Schedu
return total;
}
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
protected Queue<Exchange> createExchanges(Message[] messages) throws
MessagingException {
Queue<Exchange> answer = new LinkedList<Exchange>();
Modified:
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
---
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
(original)
+++
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
Sat Jan 14 17:25:26 2012
@@ -20,14 +20,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory;
*
* @version
*/
-public class MyBatisConsumer extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
+public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG =
LoggerFactory.getLogger(MyBatisConsumer.class);
@@ -116,10 +114,6 @@ public class MyBatisConsumer extends Sch
return processBatch(CastUtils.cast(answer));
}
- public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
- this.maxMessagesPerPoll = maxMessagesPerPoll;
- }
-
public int processBatch(Queue<Object> exchanges) throws Exception {
final MyBatisEndpoint endpoint = getEndpoint();
@@ -161,54 +155,6 @@ public class MyBatisConsumer extends Sch
return total;
}
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- // store a reference what to do in case when shutting down and we have
pending messages
- this.shutdownRunningTask = shutdownRunningTask;
- // do not defer shutdown
- return false;
- }
-
- public int getPendingExchangesSize() {
- int answer;
- // only return the real pending size in case we are configured to
complete all tasks
- if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- answer = pendingExchanges;
- } else {
- answer = 0;
- }
-
- if (answer == 0 && isPolling()) {
- // force at least one pending exchange if we are polling as there
is a little gap
- // in the processBatch method and until an exchange gets enlisted
as in-flight
- // which happens later, so we need to signal back to the shutdown
strategy that
- // there is a pending exchange. When we are no longer polling,
then we will return 0
- log.trace("Currently polling so returning 1 as pending exchanges");
- answer = 1;
- }
-
- return answer;
- }
-
- public void prepareShutdown() {
- // noop
- }
-
- public boolean isBatchAllowed() {
- // stop if we are not running
- boolean answer = isRunAllowed();
- if (!answer) {
- return false;
- }
-
- if (shutdownRunningTask == null) {
- // we are not shutting down so continue to run
- return true;
- }
-
- // we are shutting down so only continue if we are configured to
complete all tasks
- return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
- }
-
private Exchange createExchange(Object data) {
final MyBatisEndpoint endpoint = getEndpoint();
final Exchange exchange =
endpoint.createExchange(ExchangePattern.InOnly);