Repository: camel Updated Branches: refs/heads/master cc8f51f83 -> 7d165439c
CAMEL-9957 Let the KafkaEndpoint decide which kind of producer need to be return Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d165439 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d165439 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d165439 Branch: refs/heads/master Commit: 7d165439c6e85dcdd309462931252e94786971e0 Parents: cc8f51f Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue May 17 12:00:33 2016 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue May 17 12:00:33 2016 +0800 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaEndpoint.java | 8 +++++++- .../apache/camel/component/kafka/KafkaProducer.java | 15 +++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7d165439/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index aabd020..eb03493 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -72,7 +73,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS @Override public Producer createProducer() throws Exception { - return createProducer(this); + KafkaProducer producer = createProducer(this); + if (isSynchronous()) { + return new SynchronousDelegateProducer(producer); + } else { + return producer; + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/7d165439/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 2138df1..1254e97 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -126,6 +126,7 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override @SuppressWarnings("unchecked") + // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it public void process(Exchange exchange) throws Exception { ProducerRecord record = createRecorder(exchange); kafkaProducer.send(record).get(); @@ -135,19 +136,13 @@ public class KafkaProducer extends DefaultAsyncProducer { @SuppressWarnings("unchecked") public boolean process(Exchange exchange, AsyncCallback callback) { try { - if (endpoint.isSynchronous()) { - // force process using synchronous call on kafka - process(exchange); - } else { - ProducerRecord record = createRecorder(exchange); - kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); - // return false to process asynchronous - return false; - } + ProducerRecord record = createRecorder(exchange); + kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); + // return false to process asynchronous + return false; } catch (Exception ex) { exchange.setException(ex); } - callback.done(true); return true; }