Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x bd29f1ccc -> 5ac926c3c
  refs/heads/master dc9e8ebcd -> a9bad7b1a


CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka 
endpoint.

Cherry-picked from 19e70a6a36b75a1e2c3291d9d21df24e263023fa.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ac926c3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ac926c3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ac926c3

Branch: refs/heads/camel-2.15.x
Commit: 5ac926c3ca70dd35e8109405c51b69d87e12ef22
Parents: bd29f1c
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Wed Jul 1 23:03:12 2015 +0800
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Sep 2 23:30:45 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaEndpoint.java    |  8 ++++++++
 .../apache/camel/component/kafka/KafkaProducer.java    |  5 ++++-
 .../camel/component/kafka/KafkaProducerTest.java       | 13 +++++++++++++
 3 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1652d78..9960a89 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -489,4 +489,12 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
     public boolean isMultipleConsumersSupported() {
         return true;
     }
+
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0fde1ae..06a0317 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -64,7 +64,10 @@ public class KafkaProducer<K, V> extends DefaultProducer {
     @Override
     @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws CamelException {
-        String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
endpoint.getTopic(), String.class);
+        String topic = endpoint.getTopic();
+        if (!endpoint.isBridgeEndpoint()) {
+            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, 
String.class);
+        }
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d989c96..d2e868d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -128,6 +128,19 @@ public class KafkaProducerTest {
 
         verifySendMessage("someKey", "someTopic", "someKey");
     }
+    
+    @Test
+    public void processSendMessageWithBridgeEndpoint() throws Exception {
+        endpoint.setTopic("someTopic");
+        endpoint.setBridgeEndpoint(true);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+        
+        producer.process(exchange);
+        
+        verifySendMessage("someKey", "someTopic", "someKey");
+    }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(String partitionKey, String topic, String 
messageKey) {

Reply via email to