This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0a79ae35b59ec7474b5a15f16325f0d03551bbc5 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Sep 2 09:38:43 2019 +0200 CAMEL-13917: Configuring endpoints with consumer. prefix for consumer options is no longer supported as they should be regular options instead. This also avoid reflection setting properties. --- .../camel/component/file/FileConfigureTest.java | 6 +- ...ileConsumerBridgeRouteExceptionHandlerTest.java | 2 +- .../FileConsumerCustomExceptionHandlerTest.java | 2 +- .../file/FileConsumerPollStrategyNotBeginTest.java | 2 +- ...FileConsumerPollStrategyPolledMessagesTest.java | 2 +- .../file/FileConsumerPollStrategyTest.java | 2 +- .../scheduler/SchedulerNoPolledMessagesTest.java | 2 +- .../impl/DefaultComponentValidateURITest.java | 20 +-- ...edPollEndpointConfigureConsumerRestartTest.java | 146 --------------------- .../CustomConsumerExceptionHandlerTest.java | 2 +- .../DeadLetterChannelAlwaysHandledTest.java | 2 +- ...tConsumerBridgeErrorHandlerOnExceptionTest.java | 2 +- ...ltConsumerBridgeErrorHandlerRedeliveryTest.java | 2 +- .../DefaultConsumerBridgeErrorHandlerTest.java | 2 +- ...cheduledPollConsumerBridgeErrorHandlerTest.java | 2 +- .../enricher/PollEnrichBridgeErrorHandlerTest.java | 2 +- .../impl/RuntimeCamelCatalogTest.java | 8 -- .../BridgeExceptionHandlerToErrorHandler.java | 2 +- .../org/apache/camel/support/DefaultEndpoint.java | 75 ++--------- .../camel/support/ScheduledPollConsumer.java | 10 +- .../camel/support/ScheduledPollEndpoint.java | 130 ++++++------------ 21 files changed, 82 insertions(+), 341 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConfigureTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConfigureTest.java index a9679b3..bd48ffc 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConfigureTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConfigureTest.java @@ -53,19 +53,19 @@ public class FileConfigureTest extends ContextTestSupport { @Test public void testUriWithParameters() throws Exception { - FileEndpoint endpoint = resolveMandatoryEndpoint("file:///C:/camel/temp?delay=10&useFixedDelay=true&initialDelay=10&consumer.bridgeErrorHandler=true" + FileEndpoint endpoint = resolveMandatoryEndpoint("file:///C:/camel/temp?delay=10&useFixedDelay=true&initialDelay=10&bridgeErrorHandler=true" + "&autoCreate=false&startingDirectoryMustExist=true&directoryMustExist=true&readLock=changed", FileEndpoint.class); assertNotNull("Could not find file endpoint", endpoint); assertEquals("Get a wrong option of StartingDirectoryMustExist", true, endpoint.isStartingDirectoryMustExist()); endpoint = resolveMandatoryEndpoint("file:///C:/camel/temp?delay=10&useFixedDelay=true&initialDelay=10&startingDirectoryMustExist=true" - + "&consumer.bridgeErrorHandler=true&autoCreate=false&directoryMustExist=true&readLock=changed", FileEndpoint.class); + + "&bridgeErrorHandler=true&autoCreate=false&directoryMustExist=true&readLock=changed", FileEndpoint.class); assertNotNull("Could not find file endpoint", endpoint); assertEquals("Get a wrong option of StartingDirectoryMustExist", true, endpoint.isStartingDirectoryMustExist()); endpoint = resolveMandatoryEndpoint("file:///C:/camel/temp?delay=10&startingDirectoryMustExist=true&useFixedDelay=true&initialDelay=10" - + "&consumer.bridgeErrorHandler=true&autoCreate=false&directoryMustExist=true&readLock=changed", FileEndpoint.class); + + "&bridgeErrorHandler=true&autoCreate=false&directoryMustExist=true&readLock=changed", FileEndpoint.class); assertNotNull("Could not find file endpoint", endpoint); assertEquals("Get a wrong option of StartingDirectoryMustExist", true, endpoint.isStartingDirectoryMustExist()); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java index a06ff24..eb84f82 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java @@ -67,7 +67,7 @@ public class FileConsumerBridgeRouteExceptionHandlerTest extends ContextTestSupp // bridge the consumer to use the Camel routing error handler // the exclusiveReadLockStrategy is only configured because this // is from an unit test, so we use that to simulate exceptions - from("file:target/data/nospace?exclusiveReadLockStrategy=#myReadLockStrategy&consumer.bridgeErrorHandler=true&initialDelay=0&delay=10").convertBodyTo(String.class) + from("file:target/data/nospace?exclusiveReadLockStrategy=#myReadLockStrategy&bridgeErrorHandler=true&initialDelay=0&delay=10").convertBodyTo(String.class) .to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java index 2eb2658..327d87f 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java @@ -83,7 +83,7 @@ public class FileConsumerCustomExceptionHandlerTest extends ContextTestSupport { // our custom exception handler on the consumer // the exclusiveReadLockStrategy is only configured because this // is from an unit test, so we use that to simulate exceptions - from("file:target/data/nospace?exclusiveReadLockStrategy=#myReadLockStrategy&consumer.exceptionHandler=#myExceptionHandler&initialDelay=0&delay=10") + from("file:target/data/nospace?exclusiveReadLockStrategy=#myReadLockStrategy&exceptionHandler=#myExceptionHandler&initialDelay=0&delay=10") .convertBodyTo(String.class).to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java index bf8284c..5a8149d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java @@ -35,7 +35,7 @@ public class FileConsumerPollStrategyNotBeginTest extends ContextTestSupport { private static int counter; private static volatile String event = ""; - private String fileUrl = "file://target/data/pollstrategy/?consumer.pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10"; + private String fileUrl = "file://target/data/pollstrategy/?pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10"; @Override protected JndiRegistry createRegistry() throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java index 6c80674..fc3c634 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java @@ -38,7 +38,7 @@ public class FileConsumerPollStrategyPolledMessagesTest extends ContextTestSuppo private static int maxPolls; private final CountDownLatch latch = new CountDownLatch(1); - private String fileUrl = "file://target/data/pollstrategy/?consumer.pollStrategy=#myPoll&initialDelay=0&delay=10"; + private String fileUrl = "file://target/data/pollstrategy/?pollStrategy=#myPoll&initialDelay=0&delay=10"; @Override protected JndiRegistry createRegistry() throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java index ad0c81b..7695d4b 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java @@ -35,7 +35,7 @@ public class FileConsumerPollStrategyTest extends ContextTestSupport { private static int counter; private static String event = ""; - private String fileUrl = "file://target/data/pollstrategy/?consumer.pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10"; + private String fileUrl = "file://target/data/pollstrategy/?pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10"; @Override protected JndiRegistry createRegistry() throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerNoPolledMessagesTest.java b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerNoPolledMessagesTest.java index f5b9564..a9c903a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerNoPolledMessagesTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerNoPolledMessagesTest.java @@ -42,7 +42,7 @@ public class SchedulerNoPolledMessagesTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { - from("scheduler://foo?delay=100&backoffMultiplier=10&backoffIdleThreshold=2").log("Fired scheduler").process(new Processor() { + from("scheduler://foo?delay=100&backoffMultiplier=10&backoffIdleThreshold=2&scheduler.concurrentTasks=2").log("Fired scheduler").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { // force no messages to be polled which should affect diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java index 01a9942..69400e2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java @@ -64,21 +64,9 @@ public class DefaultComponentValidateURITest extends ContextTestSupport { @Test public void testScheduledPollConsumerOptions() throws Exception { - // test that we support both notations of scheduled polling consumer - // options + // test that we support both notations of scheduled polling consumer options - // with consumer. prefix - Endpoint endpint = context.getEndpoint("file://target/data/foo?consumer.delay=1000"); - assertNotNull(endpint); - - endpint = context.getEndpoint("file://target/data/foo?consumer.delay=1000&consumer.initialDelay=5000"); - assertNotNull(endpint); - - endpint = context.getEndpoint("file://target/data/foo?consumer.delay=1000&consumer.initialDelay=5000&consumer.useFixedDelay=false"); - assertNotNull(endpint); - - // without consumer. prefix - endpint = context.getEndpoint("file://foo2?delay=1000"); + Endpoint endpint = context.getEndpoint("file://foo2?delay=1000"); assertNotNull(endpint); endpint = context.getEndpoint("file://foo2?delay=1000&initialDelay=5000"); @@ -86,10 +74,6 @@ public class DefaultComponentValidateURITest extends ContextTestSupport { endpint = context.getEndpoint("file://foo2?delay=1000&initialDelay=5000&useFixedDelay=false"); assertNotNull(endpint); - - // combined with and without consumer. prefix - endpint = context.getEndpoint("file://foo3?delay=1000&consumer.initialDelay=5000&useFixedDelay=false"); - assertNotNull(endpint); } } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java deleted file mode 100644 index 619437c..0000000 --- a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.Consumer; -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.ScheduledPollConsumer; -import org.apache.camel.support.ScheduledPollEndpoint; -import org.junit.Test; - -/** - * - */ -public class ScheduledPollEndpointConfigureConsumerRestartTest extends ContextTestSupport { - - private MyEndpoint my; - private Map<String, Object> props = new HashMap<>(); - - @Test - public void testRestart() throws Exception { - getMockEndpoint("mock:result").expectedMinimumMessageCount(1); - - assertMockEndpointsSatisfied(); - - assertEquals("Hello", getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody()); - assertEquals(123, getMockEndpoint("mock:result").getExchanges().get(0).getIn().getHeader("foo")); - - // restart route - resetMocks(); - context.getRouteController().stopRoute("foo"); - - getMockEndpoint("mock:result").expectedMinimumMessageCount(1); - - // start route - context.getRouteController().startRoute("foo"); - - assertMockEndpointsSatisfied(); - - assertEquals("Hello", getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody()); - assertEquals(123, getMockEndpoint("mock:result").getExchanges().get(0).getIn().getHeader("foo")); - - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - props.put("foo", 123); - props.put("bar", "Hello"); - props.put("delay", 1000); - - my = new MyEndpoint(); - my.setCamelContext(context); - my.setConsumerProperties(props); - - from(my).routeId("foo").to("mock:result"); - } - }; - } - - private static class MyEndpoint extends ScheduledPollEndpoint { - - @Override - public Producer createProducer() throws Exception { - return null; - } - - @Override - public Consumer createConsumer(Processor processor) throws Exception { - MyConsumer answer = new MyConsumer(this, processor); - configureConsumer(answer); - return answer; - } - - @Override - public boolean isSingleton() { - return true; - } - - @Override - protected String createEndpointUri() { - return "myendpoint:foo"; - } - } - - public static final class MyConsumer extends ScheduledPollConsumer { - - private int foo; - private String bar; - - public MyConsumer(Endpoint endpoint, Processor processor) { - super(endpoint, processor); - } - - public int getFoo() { - return foo; - } - - public void setFoo(int foo) { - this.foo = foo; - } - - public String getBar() { - return bar; - } - - public void setBar(String bar) { - this.bar = bar; - } - - @Override - protected int poll() throws Exception { - Exchange exchange = new DefaultExchange(getEndpoint()); - exchange.getIn().setBody(bar); - exchange.getIn().setHeader("foo", foo); - - getProcessor().process(exchange); - - return 1; - } - } -} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java index 33895cf..70c11b7 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java @@ -55,7 +55,7 @@ public class CustomConsumerExceptionHandlerTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo").to("mock:foo").to("direct:bar").to("mock:result"); + from("seda:foo?synchronous=true&exceptionHandler=#myHandler").routeId("foo").to("mock:foo").to("direct:bar").to("mock:result"); from("direct:bar").routeId("bar").onException(IllegalArgumentException.class).maximumRedeliveries(3).redeliveryDelay(0).end().to("mock:bar") .throwException(new IllegalArgumentException("Forced")); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java index d6dc446..0c40754 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java @@ -57,7 +57,7 @@ public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport { public void configure() throws Exception { errorHandler(deadLetterChannel("mock:dead")); - from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo").to("mock:foo").to("direct:bar").to("mock:result"); + from("seda:foo?synchronous=true&exceptionHandler=#myHandler").routeId("foo").to("mock:foo").to("direct:bar").to("mock:result"); from("direct:bar").routeId("bar").onException(IllegalArgumentException.class).maximumRedeliveries(3).redeliveryDelay(0).end().to("mock:bar") .throwException(new IllegalArgumentException("Forced")); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerOnExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerOnExceptionTest.java index c3712da..557b37a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerOnExceptionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerOnExceptionTest.java @@ -72,7 +72,7 @@ public class DefaultConsumerBridgeErrorHandlerOnExceptionTest extends ContextTes // handler, // so the above error handler will trigger if exceptions also // occurs inside the consumer - from("my:foo?consumer.bridgeErrorHandler=true").to("log:foo").to("mock:result"); + from("my:foo?bridgeErrorHandler=true").to("log:foo").to("mock:result"); from("direct:error").to("mock:b").log("Error happened due ${exception.message}"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerRedeliveryTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerRedeliveryTest.java index e1aab13..6d1029c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerRedeliveryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerRedeliveryTest.java @@ -71,7 +71,7 @@ public class DefaultConsumerBridgeErrorHandlerRedeliveryTest extends DefaultCons // handler, // so the above error handler will trigger if exceptions also // occurs inside the consumer - from("my:foo?consumer.bridgeErrorHandler=true").to("log:foo").to("mock:result"); + from("my:foo?bridgeErrorHandler=true").to("log:foo").to("mock:result"); } }; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java index 070e4b6..1e09f4a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java @@ -70,7 +70,7 @@ public class DefaultConsumerBridgeErrorHandlerTest extends ContextTestSupport { // handler, // so the above error handler will trigger if exceptions also // occurs inside the consumer - from("my:foo?consumer.bridgeErrorHandler=true").to("log:foo").to("mock:result"); + from("my:foo?bridgeErrorHandler=true").to("log:foo").to("mock:result"); } }; // END SNIPPET: e1 diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java index 88e6d7b..ea60f8a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java @@ -62,7 +62,7 @@ public class DefaultScheduledPollConsumerBridgeErrorHandlerTest extends ContextT // handler, // so the above error handler will trigger if exceptions also // occurs inside the consumer - from("my:foo?consumer.bridgeErrorHandler=true").to("log:foo").to("mock:result"); + from("my:foo?bridgeErrorHandler=true").to("log:foo").to("mock:result"); } }; // END SNIPPET: e1 diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java index 4f69cec..17fba5b 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java @@ -65,7 +65,7 @@ public class PollEnrichBridgeErrorHandlerTest extends ContextTestSupport { from("seda:start") // bridge the error handler when doing a polling so we can // let Camel's error handler decide what to do - .pollEnrich("file:target/data/foo?initialDelay=0&delay=10&pollStrategy=#myPoll&consumer.bridgeErrorHandler=true", 10000, new UseLatestAggregationStrategy()) + .pollEnrich("file:target/data/foo?initialDelay=0&delay=10&pollStrategy=#myPoll&bridgeErrorHandler=true", 10000, new UseLatestAggregationStrategy()) .to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/runtimecatalog/impl/RuntimeCamelCatalogTest.java b/core/camel-core/src/test/java/org/apache/camel/runtimecatalog/impl/RuntimeCamelCatalogTest.java index 1c3c5ae..d950e66 100644 --- a/core/camel-core/src/test/java/org/apache/camel/runtimecatalog/impl/RuntimeCamelCatalogTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/runtimecatalog/impl/RuntimeCamelCatalogTest.java @@ -215,18 +215,10 @@ public class RuntimeCamelCatalogTest { result = catalog.validateEndpointProperties("timer://foo?fixedRate=#fixed&delay=#myDelay"); assertTrue(result.isSuccess()); - // optional consumer. prefix - result = catalog.validateEndpointProperties("file:inbox?consumer.delay=5000&consumer.greedy=true"); - assertTrue(result.isSuccess()); - // optional without consumer. prefix result = catalog.validateEndpointProperties("file:inbox?delay=5000&greedy=true"); assertTrue(result.isSuccess()); - // mixed optional without consumer. prefix - result = catalog.validateEndpointProperties("file:inbox?delay=5000&consumer.greedy=true"); - assertTrue(result.isSuccess()); - // prefix result = catalog.validateEndpointProperties("file:inbox?delay=5000&scheduler.foo=123&scheduler.bar=456"); assertTrue(result.isSuccess()); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java index 234a36b..5ca068a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java @@ -26,7 +26,7 @@ import org.apache.camel.spi.UnitOfWork; * process the caused exception to send the message into the Camel routing engine * which allows to let the routing engine handle the exception. * <p/> - * An endpoint can be configured with <tt>consumer.bridgeErrorHandler=true</tt> in the URI + * An endpoint can be configured with <tt>bridgeErrorHandler=true</tt> in the URI * to enable this {@link BridgeExceptionHandlerToErrorHandler} on the consumer. * The consumer must extend the {@link DefaultConsumer}, to support this, if not an * {@link IllegalArgumentException} is thrown upon startup. diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java index ff7b490..c796dd8 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java @@ -16,7 +16,6 @@ */ package org.apache.camel.support; -import java.util.HashMap; import java.util.Map; import org.apache.camel.AsyncProducer; @@ -28,7 +27,6 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.PollingConsumer; -import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HasId; import org.apache.camel.spi.PropertyConfigurer; @@ -36,7 +34,6 @@ import org.apache.camel.spi.PropertyConfigurerAware; import org.apache.camel.spi.UriParam; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.PropertiesHelper; import org.apache.camel.util.StringHelper; import org.apache.camel.util.URISupport; @@ -84,9 +81,6 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @UriParam(label = "advanced", description = "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities") private boolean basicPropertyBinding; - // these options are not really in use any option related to the consumer has a specific option on the endpoint - // and consumerProperties was added from the very start of Camel. - private Map<String, Object> consumerProperties; // pooling consumer options only related to EventDrivenPollingConsumer which are very seldom in use // so lets not expose them in the component docs as it will be included in every component private int pollingConsumerQueueSize = 1000; @@ -396,10 +390,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @Override public void configureProperties(Map<String, Object> options) { - Map<String, Object> consumerProperties = PropertiesHelper.extractProperties(options, "consumer."); - if (consumerProperties != null && !consumerProperties.isEmpty()) { - setConsumerProperties(consumerProperties); - } + // noop } /** @@ -477,56 +468,25 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint return false; } - public Map<String, Object> getConsumerProperties() { - if (consumerProperties == null) { - // must create empty if none exists - consumerProperties = new HashMap<>(); - } - return consumerProperties; - } - - public void setConsumerProperties(Map<String, Object> consumerProperties) { - // append consumer properties - if (consumerProperties != null && !consumerProperties.isEmpty()) { - if (this.consumerProperties == null) { - this.consumerProperties = new HashMap<>(consumerProperties); - } else { - this.consumerProperties.putAll(consumerProperties); - } - } - } - protected void configureConsumer(Consumer consumer) throws Exception { // inject CamelContext if (consumer instanceof CamelContextAware) { ((CamelContextAware) consumer).setCamelContext(getCamelContext()); } - if (consumerProperties != null) { - // use a defensive copy of the consumer properties as the methods below will remove the used properties - // and in case we restart routes, we need access to the original consumer properties again - Map<String, Object> copy = new HashMap<>(consumerProperties); - - // configure consumer - setProperties(consumer, copy); - - // special consumer.bridgeErrorHandler option - Object bridge = copy.remove("bridgeErrorHandler"); - if ("true".equals(bridge)) { - if (consumer instanceof DefaultConsumer) { - DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; - defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer)); - } else { - throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints," - + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class."); - } + if (bridgeErrorHandler) { + if (consumer instanceof DefaultConsumer) { + DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; + defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer)); + } else { + throw new IllegalArgumentException("Option bridgeErrorHandler is only supported by endpoints," + + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class."); } - - if (!this.isLenientProperties() && copy.size() > 0) { - throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() - + " parameters that couldn't be set on the endpoint consumer." - + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." - + " Unknown consumer parameters=[" + copy + "]"); + } + if (exceptionHandler != null) { + if (consumer instanceof DefaultConsumer) { + DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; + defaultConsumer.setExceptionHandler(exceptionHandler); } } } @@ -537,14 +497,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @Override protected void doInit() throws Exception { - // the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true - // so if they have been configured on the endpoint then map to the old naming style - if (bridgeErrorHandler) { - getConsumerProperties().put("bridgeErrorHandler", "true"); - } - if (exceptionHandler != null) { - getConsumerProperties().put("exceptionHandler", exceptionHandler); - } + // noop } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java index 4b1b630..1e052a5 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java @@ -413,15 +413,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R } if (scheduler == null) { - scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService); + DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService); + scheduler.setDelay(delay); + scheduler.setInitialDelay(initialDelay); + scheduler.setTimeUnit(timeUnit); + scheduler.setUseFixedDelay(useFixedDelay); + this.scheduler = scheduler; } scheduler.setCamelContext(getEndpoint().getCamelContext()); scheduler.onInit(this); // configure scheduler with options from this consumer - Map<String, Object> properties = new LinkedHashMap<>(); - getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class).getBeanIntrospection().getProperties(this, properties, null); - PropertyBindingSupport.build().bind(getEndpoint().getCamelContext(), scheduler, properties); if (schedulerProperties != null && !schedulerProperties.isEmpty()) { // need to use a copy in case the consumer is restarted so we keep the properties Map<String, Object> copy = new LinkedHashMap<>(schedulerProperties); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java index 52c0e05..c3ec3d9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java @@ -16,12 +16,12 @@ */ package org.apache.camel.support; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.camel.Component; +import org.apache.camel.Consumer; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.LoggingLevel; import org.apache.camel.PollingConsumer; @@ -42,54 +42,54 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { private transient ScheduledPollConsumerScheduler consumerScheduler; // if adding more options then align with org.apache.camel.support.ScheduledPollConsumer - @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler", + @UriParam(defaultValue = "true", label = "consumer,scheduler", description = "Whether the scheduler should be auto started.") private boolean startScheduler = true; - @UriParam(optionalPrefix = "consumer.", defaultValue = "1000", label = "consumer,scheduler", + @UriParam(defaultValue = "1000", label = "consumer,scheduler", description = "Milliseconds before the first poll starts." + " You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour).") private long initialDelay = 1000; - @UriParam(optionalPrefix = "consumer.", defaultValue = "500", label = "consumer,scheduler", + @UriParam(defaultValue = "500", label = "consumer,scheduler", description = "Milliseconds before the next poll." + " You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour).") private long delay = 500; - @UriParam(optionalPrefix = "consumer.", defaultValue = "MILLISECONDS", label = "consumer,scheduler", + @UriParam(defaultValue = "MILLISECONDS", label = "consumer,scheduler", description = "Time unit for initialDelay and delay options.") private TimeUnit timeUnit = TimeUnit.MILLISECONDS; - @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler", + @UriParam(defaultValue = "true", label = "consumer,scheduler", description = "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.") private boolean useFixedDelay = true; - @UriParam(optionalPrefix = "consumer.", label = "consumer,advanced", + @UriParam(label = "consumer,advanced", description = "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation" + " to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.") private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); - @UriParam(optionalPrefix = "consumer.", defaultValue = "TRACE", label = "consumer,scheduler", + @UriParam(defaultValue = "TRACE", label = "consumer,scheduler", description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.") private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; - @UriParam(optionalPrefix = "consumer.", label = "consumer", + @UriParam(label = "consumer", description = "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.") private boolean sendEmptyMessageWhenIdle; - @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", + @UriParam(label = "consumer,scheduler", description = "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.") private boolean greedy; - @UriParam(optionalPrefix = "consumer.", enums = "none,spring,quartz", + @UriParam(enums = "none,spring,quartz", defaultValue = "none", label = "consumer,scheduler", description = "To use a cron scheduler from either camel-spring or camel-quartz component") private String scheduler = "none"; @UriParam(prefix = "scheduler.", multiValue = true, label = "consumer,scheduler", description = "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler.") private Map<String, Object> schedulerProperties; - @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", + @UriParam(label = "consumer,scheduler", description = "Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.") private ScheduledExecutorService scheduledExecutorService; - @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", + @UriParam(label = "consumer,scheduler", description = "To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row." + " The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again." + " When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.") private int backoffMultiplier; - @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", + @UriParam(label = "consumer,scheduler", description = "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.") private int backoffIdleThreshold; - @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", + @UriParam(label = "consumer,scheduler", description = "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.") private int backoffErrorThreshold; @@ -101,16 +101,22 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { } @Override + protected void configureConsumer(Consumer consumer) throws Exception { + super.configureConsumer(consumer); + doConfigureConsumer(consumer); + } + + @Override public void configureProperties(Map<String, Object> options) { super.configureProperties(options); - configureScheduledPollConsumerProperties(options, getConsumerProperties()); + configureScheduledPollConsumerProperties(options); } - protected void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) { + protected void configureScheduledPollConsumerProperties(Map<String, Object> options) { // special for scheduled poll consumers as we want to allow end users to configure its options // from the URI parameters without the consumer. prefix Map<String, Object> schedulerProperties = PropertiesHelper.extractProperties(options, "scheduler."); - if (schedulerProperties != null && !schedulerProperties.isEmpty()) { + if (!schedulerProperties.isEmpty()) { setSchedulerProperties(schedulerProperties); } @@ -143,81 +149,31 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { } } - @Override - protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { - Map<String, Object> copy = new HashMap<>(getConsumerProperties()); - Map<String, Object> throwaway = new HashMap<>(); - - // filter out unwanted options which is intended for the scheduled poll consumer - // as these options are not supported on the polling consumer - configureScheduledPollConsumerProperties(copy, throwaway); - - // configure consumer - setProperties(consumer, copy); - - if (!isLenientProperties() && copy.size() > 0) { - throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() - + " parameters that couldn't be set on the endpoint polling consumer." - + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." - + " Unknown consumer parameters=[" + copy + "]"); - } - } - - protected void initConsumerProperties() { - // must setup consumer properties before we are ready to start - Map<String, Object> options = getConsumerProperties(); - if (!options.containsKey("startScheduler")) { - options.put("startScheduler", isStartScheduler()); - } - if (!options.containsKey("initialDelay")) { - options.put("initialDelay", getInitialDelay()); - } - if (!options.containsKey("delay")) { - options.put("delay", getDelay()); - } - if (!options.containsKey("timeUnit")) { - options.put("timeUnit", getTimeUnit()); - } - if (!options.containsKey("useFixedDelay")) { - options.put("useFixedDelay", isUseFixedDelay()); - } - if (!options.containsKey("pollStrategy")) { - options.put("pollStrategy", getPollStrategy()); - } - if (!options.containsKey("runLoggingLevel")) { - options.put("runLoggingLevel", getRunLoggingLevel()); - } - if (!options.containsKey("sendEmptyMessageWhenIdle")) { - options.put("sendEmptyMessageWhenIdle", isSendEmptyMessageWhenIdle()); - } - if (!options.containsKey("greedy")) { - options.put("greedy", isGreedy()); - } - if (!options.containsKey("scheduler")) { - // the scheduler implementation - options.put("scheduler", consumerScheduler); - } - if (!options.containsKey("schedulerProperties")) { - options.put("schedulerProperties", getSchedulerProperties()); - } - if (!options.containsKey("scheduledExecutorService")) { - options.put("scheduledExecutorService", getScheduledExecutorService()); - } - if (!options.containsKey("backoffMultiplier")) { - options.put("backoffMultiplier", getBackoffMultiplier()); - } - if (!options.containsKey("backoffIdleThreshold")) { - options.put("backoffIdleThreshold", getBackoffIdleThreshold()); - } - if (!options.containsKey("backoffErrorThreshold")) { - options.put("backoffErrorThreshold", getBackoffErrorThreshold()); + protected void doConfigureConsumer(Consumer consumer) { + if (consumer instanceof ScheduledPollConsumer) { + ScheduledPollConsumer spc = (ScheduledPollConsumer) consumer; + spc.setBackoffErrorThreshold(backoffErrorThreshold); + spc.setBackoffIdleThreshold(backoffIdleThreshold); + spc.setBackoffMultiplier(backoffMultiplier); + spc.setDelay(delay); + spc.setGreedy(greedy); + spc.setInitialDelay(initialDelay); + spc.setPollStrategy(pollStrategy); + spc.setRunLoggingLevel(runLoggingLevel); + spc.setScheduledExecutorService(scheduledExecutorService); + spc.setSendEmptyMessageWhenIdle(sendEmptyMessageWhenIdle); + spc.setTimeUnit(timeUnit); + spc.setUseFixedDelay(useFixedDelay); + spc.setStartScheduler(startScheduler); + spc.setScheduler(consumerScheduler); + spc.setSchedulerProperties(schedulerProperties); } } @Override protected void doStart() throws Exception { - initConsumerProperties(); super.doStart(); + // noop } @Override
