Repository: camel Updated Branches: refs/heads/camel-2.17.x 04179ec97 -> 03a05be91 refs/heads/master 123f08fa4 -> 32b9bf2b8
CAMEL-9957 camel-kafka producer sends the message in an async way Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/733d45f7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/733d45f7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/733d45f7 Branch: refs/heads/camel-2.17.x Commit: 733d45f790eae31ec7b65aa6768ae09b9f54e49c Parents: 04179ec Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed May 11 11:55:13 2016 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed May 11 11:56:04 2016 +0800 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 54 +++++++++++++++++--- .../component/kafka/KafkaProducerTest.java | 52 +++++++++++++++++++ 2 files changed, 100 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/733d45f7/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 4f6468b..0c4013f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -18,14 +18,18 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; -public class KafkaProducer extends DefaultProducer { +public class KafkaProducer extends DefaultProducer implements AsyncProcessor { private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; @@ -69,9 +73,7 @@ public class KafkaProducer extends DefaultProducer { } } - @Override - @SuppressWarnings("unchecked") - public void process(Exchange exchange) throws CamelException { + protected ProducerRecord createRecorder(Exchange exchange) throws CamelException { String topic = endpoint.getTopic(); if (!endpoint.isBridgeEndpoint()) { topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class); @@ -96,9 +98,15 @@ public class KafkaProducer extends DefaultProducer { log.warn("No message key or partition key set"); record = new ProducerRecord(topic, msg); } + return record; + } + + @Override + @SuppressWarnings("unchecked") + public void process(Exchange exchange) throws CamelException { - // TODO: add support for async callback - // requires a thread pool for processing outgoing routing + ProducerRecord record = createRecorder(exchange); + // Just send out the record in the sync way try { kafkaProducer.send(record).get(); } catch (Exception e) { @@ -106,4 +114,38 @@ public class KafkaProducer extends DefaultProducer { } } + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + ProducerRecord record = createRecorder(exchange); + kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); + // Finishing the processing in an async way + return false; + } catch (Exception ex) { + // Just set the exception back to the client + exchange.setException(ex); + callback.done(true); + return true; + } + } + + class KafkaProducerCallBack implements Callback { + + private Exchange exchange; + private AsyncCallback callback; + + KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + this.callback = callback; + } + + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + // Just set the exception back + exchange.setException(e); + } + callback.done(false); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/733d45f7/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 06deedc..ef57594 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -19,13 +19,16 @@ package org.apache.camel.component.kafka; import java.util.Properties; import java.util.concurrent.Future; +import org.apache.camel.AsyncCallback; import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.ApiException; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Matchers; @@ -40,6 +43,7 @@ public class KafkaProducerTest { private Exchange exchange = Mockito.mock(Exchange.class); private Message in = new DefaultMessage(); + private AsyncCallback callback = Mockito.mock(AsyncCallback.class); @SuppressWarnings({"unchecked"}) public KafkaProducerTest() throws Exception { @@ -75,6 +79,54 @@ public class KafkaProducerTest { Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class)); } + @Test(expected=CamelException.class) + @SuppressWarnings({"unchecked"}) + public void processSendsMessageWithException() throws Exception { + endpoint.setTopic("sometopic"); + // setup the exception here + org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); + Mockito.when(kp.send(Mockito.any())).thenThrow(new ApiException()); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + + producer.process(exchange); + + } + + @Test + public void processAsyncSendsMessage() throws Exception { + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + + producer.process(exchange, callback); + + Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class)); + + } + + + @Test + public void processAsyncSendsMessageWithException() throws Exception { + + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + + // setup the exception here + org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); + Mockito.when(kp.send(Mockito.any(), Mockito.any())).thenThrow(new ApiException()); + + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + + producer.process(exchange, callback); + + Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class)); + Mockito.verify(exchange).setException(Matchers.isA(ApiException.class)); + Mockito.verify(callback).done(Matchers.eq(true)); + } + + @Test public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { endpoint.setTopic(null);