Repository: camel
Updated Branches:
  refs/heads/master cc8f51f83 -> 7d165439c


CAMEL-9957 Let the KafkaEndpoint decide which kind of producer need to be return


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d165439
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d165439
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d165439

Branch: refs/heads/master
Commit: 7d165439c6e85dcdd309462931252e94786971e0
Parents: cc8f51f
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue May 17 12:00:33 2016 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue May 17 12:00:33 2016 +0800

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaEndpoint.java  |  8 +++++++-
 .../apache/camel/component/kafka/KafkaProducer.java  | 15 +++++----------
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7d165439/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index aabd020..eb03493 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -72,7 +73,12 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
 
     @Override
     public Producer createProducer() throws Exception {
-        return createProducer(this);
+        KafkaProducer producer = createProducer(this);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(producer);
+        } else {
+            return producer;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/7d165439/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2138df1..1254e97 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -126,6 +126,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
     @Override
     @SuppressWarnings("unchecked")
+    // Camel calls this method if the endpoint isSynchronous(), as the 
KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
         ProducerRecord record = createRecorder(exchange);
         kafkaProducer.send(record).get();
@@ -135,19 +136,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("unchecked")
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            if (endpoint.isSynchronous()) {
-                // force process using synchronous call on kafka
-                process(exchange);
-            } else {
-                ProducerRecord record = createRecorder(exchange);
-                kafkaProducer.send(record, new KafkaProducerCallBack(exchange, 
callback));
-                // return false to process asynchronous
-                return false;
-            }
+            ProducerRecord record = createRecorder(exchange);
+            kafkaProducer.send(record, new KafkaProducerCallBack(exchange, 
callback));
+            // return false to process asynchronous
+            return false;
         } catch (Exception ex) {
             exchange.setException(ex);
         }
-
         callback.done(true);
         return true;
     }

Reply via email to