CAMEL-9957: Fixed as it was not correct.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1428ccfc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1428ccfc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1428ccfc Branch: refs/heads/kube-lb Commit: 1428ccfce8d68511b8233834b9fa42b50fe9d146 Parents: abdea8e Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 13 10:51:57 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 16 09:59:33 2016 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 27 ++++++++++++-------- .../component/kafka/KafkaProducerTest.java | 6 +---- 2 files changed, 18 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 0c4013f..6c432d6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -73,6 +73,7 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { } } + @SuppressWarnings("unchecked") protected ProducerRecord createRecorder(Exchange exchange) throws CamelException { String topic = endpoint.getTopic(); if (!endpoint.isBridgeEndpoint()) { @@ -103,19 +104,25 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { @Override @SuppressWarnings("unchecked") - public void process(Exchange exchange) throws CamelException { - + public void process(Exchange exchange) throws Exception { ProducerRecord record = createRecorder(exchange); // Just send out the record in the sync way - try { - kafkaProducer.send(record).get(); - } catch (Exception e) { - throw new CamelException(e); - } + kafkaProducer.send(record).get(); } @Override public boolean process(Exchange exchange, AsyncCallback callback) { + // force processing synchronously using different api + if (endpoint.isSynchronous()) { + try { + process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + try { ProducerRecord record = createRecorder(exchange); kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); @@ -129,10 +136,10 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { } } - class KafkaProducerCallBack implements Callback { + private final class KafkaProducerCallBack implements Callback { - private Exchange exchange; - private AsyncCallback callback; + private final Exchange exchange; + private final AsyncCallback callback; KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 68798f3..40f2113 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -79,7 +79,7 @@ public class KafkaProducerTest { Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class)); } - @Test(expected=CamelException.class) + @Test(expected = Exception.class) @SuppressWarnings({"unchecked"}) public void processSendsMessageWithException() throws Exception { endpoint.setTopic("sometopic"); @@ -90,7 +90,6 @@ public class KafkaProducerTest { in.setHeader(KafkaConstants.PARTITION_KEY, "4"); producer.process(exchange); - } @Test @@ -103,10 +102,8 @@ public class KafkaProducerTest { producer.process(exchange, callback); Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class)); - } - @Test public void processAsyncSendsMessageWithException() throws Exception { @@ -126,7 +123,6 @@ public class KafkaProducerTest { Mockito.verify(callback).done(Matchers.eq(true)); } - @Test public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { endpoint.setTopic(null);