Repository: camel Updated Branches: refs/heads/camel-2.15.x bd29f1ccc -> 5ac926c3c refs/heads/master dc9e8ebcd -> a9bad7b1a
CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint. Cherry-picked from 19e70a6a36b75a1e2c3291d9d21df24e263023fa. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ac926c3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ac926c3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ac926c3 Branch: refs/heads/camel-2.15.x Commit: 5ac926c3ca70dd35e8109405c51b69d87e12ef22 Parents: bd29f1c Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Jul 1 23:03:12 2015 +0800 Committer: Raul Kripalani <ra...@apache.org> Committed: Wed Sep 2 23:30:45 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaEndpoint.java | 8 ++++++++ .../apache/camel/component/kafka/KafkaProducer.java | 5 ++++- .../camel/component/kafka/KafkaProducerTest.java | 13 +++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 1652d78..9960a89 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -489,4 +489,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS public boolean isMultipleConsumersSupported() { return true; } + + public boolean isBridgeEndpoint() { + return bridgeEndpoint; + } + + public void setBridgeEndpoint(boolean bridgeEndpoint) { + this.bridgeEndpoint = bridgeEndpoint; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 0fde1ae..06a0317 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -64,7 +64,10 @@ public class KafkaProducer<K, V> extends DefaultProducer { @Override @SuppressWarnings("unchecked") public void process(Exchange exchange) throws CamelException { - String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class); + String topic = endpoint.getTopic(); + if (!endpoint.isBridgeEndpoint()) { + topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class); + } if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index d989c96..d2e868d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -128,6 +128,19 @@ public class KafkaProducerTest { verifySendMessage("someKey", "someTopic", "someKey"); } + + @Test + public void processSendMessageWithBridgeEndpoint() throws Exception { + endpoint.setTopic("someTopic"); + endpoint.setBridgeEndpoint(true); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + in.setHeader(KafkaConstants.KEY, "someKey"); + + producer.process(exchange); + + verifySendMessage("someKey", "someTopic", "someKey"); + } @SuppressWarnings({"unchecked", "rawtypes"}) protected void verifySendMessage(String partitionKey, String topic, String messageKey) {