Updated Branches: refs/heads/camel-2.10.x 7d8a31c63 -> 5d2ef93da refs/heads/camel-2.11.x a49076e4d -> 1b33dee5c refs/heads/master f9d9a3fce -> d16f6d646
CAMEL-6465: Added greedy option to scheduled poll consumer. Thanks to John Liptak for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d16f6d64 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d16f6d64 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d16f6d64 Branch: refs/heads/master Commit: d16f6d64692b97db439e457a6b20bea7285f390a Parents: f9d9a3f Author: Claus Ibsen <[email protected]> Authored: Tue Jun 25 09:12:25 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue Jun 25 09:12:25 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/ScheduledPollConsumer.java | 19 ++++ .../camel/impl/ScheduledPollEndpoint.java | 6 +- .../impl/Mock321ScheduledPollConsumer.java | 41 +++++++++ .../impl/ScheduledPollConsumerGreedyTest.java | 92 ++++++++++++++++++++ 4 files changed, 157 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d16f6d64/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 707b06d..739c692 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -63,6 +63,8 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; @UriParam private boolean sendEmptyMessageWhenIdle; + @UriParam + private boolean greedy; private volatile boolean polling; public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { @@ -155,6 +157,12 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R } pollStrategy.commit(this, getEndpoint(), polledMessages); + + if (polledMessages > 0 && isGreedy()) { + done = false; + retryCounter = -1; + LOG.trace("Greedy polling after processing {} messages", polledMessages); + } } else { LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); } @@ -304,6 +312,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R return sendEmptyMessageWhenIdle; } + public boolean isGreedy() { + return greedy; + } + + /** + * If greedy then a poll is executed immediate after a previous poll that polled 1 or more messages. + */ + public void setGreedy(boolean greedy) { + this.greedy = greedy; + } + public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } http://git-wip-us.apache.org/repos/asf/camel/blob/d16f6d64/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 1d4aa85..b3a0ed6 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -62,6 +62,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { Object pollStrategy = options.remove("pollStrategy"); Object runLoggingLevel = options.remove("runLoggingLevel"); Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle"); + Object greedy = options.remove("greedy"); Object scheduledExecutorService = options.remove("scheduledExecutorService"); boolean setConsumerProperties = false; @@ -69,7 +70,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) { setConsumerProperties = true; } - if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) { + if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) { setConsumerProperties = true; } @@ -102,6 +103,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (sendEmptyMessageWhenIdle != null) { consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle); } + if (greedy != null) { + consumerProperties.put("greedy", greedy); + } if (scheduledExecutorService != null) { consumerProperties.put("scheduledExecutorService", scheduledExecutorService); } http://git-wip-us.apache.org/repos/asf/camel/blob/d16f6d64/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java b/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java new file mode 100644 index 0000000..acb0f29 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Processor; + +public class Mock321ScheduledPollConsumer extends MockScheduledPollConsumer { + + private volatile int counter = 4; + + public Mock321ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected int poll() throws Exception { + if (counter > 0) { + counter = counter - 1; + } + return counter; + } + + @Override + public String toString() { + return "Mock321Scheduled"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/d16f6d64/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java new file mode 100644 index 0000000..6a81082 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.spi.PollingConsumerPollStrategy; + +public class ScheduledPollConsumerGreedyTest extends ContextTestSupport { + + private final AtomicInteger polled = new AtomicInteger(); + + public void test321Greedy() throws Exception { + polled.set(0); + + MockScheduledPollConsumer consumer = new Mock321ScheduledPollConsumer(getMockEndpoint("mock:foo"), null); + consumer.setGreedy(true); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + polled.addAndGet(polledMessages); + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + return false; + } + }); + + consumer.start(); + consumer.run(); + + assertEquals(6, polled.get()); + + consumer.stop(); + } + + public void test321NotGreedy() throws Exception { + polled.set(0); + + MockScheduledPollConsumer consumer = new Mock321ScheduledPollConsumer(getMockEndpoint("mock:foo"), null); + consumer.setGreedy(false); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + polled.addAndGet(polledMessages); + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + return false; + } + }); + + consumer.start(); + + consumer.run(); + assertEquals(3, polled.get()); + consumer.run(); + assertEquals(5, polled.get()); + consumer.run(); + assertEquals(6, polled.get()); + consumer.run(); + assertEquals(6, polled.get()); + + consumer.stop(); + } + +}
