CAMEL-10832: Kafka. Allow to configure brokers on component level. And made 
topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bcb4ed25
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bcb4ed25
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bcb4ed25

Branch: refs/heads/master
Commit: bcb4ed25b5dc943dac09cc0b79436cadf4fae65e
Parents: d3b38a0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Feb 15 11:23:28 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 15 11:23:28 2017 +0100

----------------------------------------------------------------------
 examples/camel-example-kafka/README.adoc        | 75 +++++++++++++++++++
 examples/camel-example-kafka/README.md          | 77 --------------------
 .../example/kafka/MessageConsumerClient.java    | 15 ++--
 .../example/kafka/MessagePublisherClient.java   | 27 +++----
 .../camel/example/kafka/StringPartitioner.java  | 34 +--------
 .../src/main/resources/application.properties   | 12 +--
 6 files changed, 96 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.adoc
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/README.adoc 
b/examples/camel-example-kafka/README.adoc
new file mode 100644
index 0000000..83e79d8
--- /dev/null
+++ b/examples/camel-example-kafka/README.adoc
@@ -0,0 +1,75 @@
+# Camel Kafka example
+
+### Introduction
+
+An example which shows how to integrate Camel with Kakfa.
+
+This project consists of the following examples:
+
+  1. Send messages continuously by typing on the command line.
+  2. Example of partitioner for a given producer.
+  3. Topic is sent in the header as well as in the URL.
+
+
+### Preparing Kafka
+
+This example requires that Kafka Server is up and running.
+
+You will need to create following topics before you run the examples.
+
+On windows run
+
+    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 2 --topic TestLog
+    
+    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 1 --topic AccessLog
+
+On linux run
+    
+    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 2 --topic TestLog
+    
+    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 1 --topic AccessLog
+
+
+### Build
+
+You will need to compile this example first:
+
+    mvn compile
+
+### Run
+
+Run the consumer first in separate shell 
+
+    
+    mvn compile exec:java -Pkafka-consumer
+
+
+Run the message producer in the seperate shell
+
+    
+    mvn compile exec:java -Pkafka-producer
+
+Initially, some messages are sent programmatically. 
+On the command prompt, type the messages. Each line is sent as one message to 
kafka
+Press `Ctrl-C` to exit.
+
+
+### Configuration
+
+You can configure the details in the file:
+  `src/main/resources/application.properties`
+
+You can enable verbose logging by adjusting the 
`src/main/resources/log4j2.properties`
+  file as documented in the file.
+
+
+### Forum, Help, etc
+
+If you hit an problems please let us know on the Camel Forums
+       <http://camel.apache.org/discussion-forums.html>
+
+Please help us make Apache Camel better - we appreciate any feedback you may
+have.  Enjoy!
+
+
+The Camel riders!

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/camel-example-kafka/README.md 
b/examples/camel-example-kafka/README.md
deleted file mode 100644
index 06c7374..0000000
--- a/examples/camel-example-kafka/README.md
+++ /dev/null
@@ -1,77 +0,0 @@
-# Camel Kafka example
-
-### Introduction
-
-An example which shows how to integrate Camel with Kakfa.
-
-This project consists of the following examples:
-
-  1. Send messages continuously by typing on the command line.
-  2. Example of partitioner for a given producer.
-  3. Topic is sent in the header as well as in the URL.
-
-
-### Preparing Kafka
-
-This example requires that Kafka Server is up and running.
-
-You will need to create following topics before you run the examples.
-
-On windows run
-
-    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 2 --topic TestLog
-    
-    kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 1 --topic AccessLog
-
-On linux run
-    
-    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 2 --topic TestLog
-    
-    kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> 
--replication-factor 1 --partitions 1 --topic AccessLog
-
-
-### Build
-
-You will need to compile this example first:
-
-    mvn compile
-
-### Run
-
-1. Run the consumer first in separate shell 
-
-    mvn compile exec:java -Pkafka-consumer
-
-
-2. Run the message producer in the seperate shell
-
-    mvn compile exec:java -Pkafka-producer
-
-   Initially, some messages are sent programmatically. 
-   
-   On the command prompt, type the messages. Each line is sent as one message 
to kafka
-   
-   Type Ctrl-C to exit.
-
-
-
-### Configuration
-
-You can configure the details in the file:
-  `src/main/resources/application.properties`
-
-You can enable verbose logging by adjusting the 
`src/main/resources/log4j.properties`
-  file as documented in the file.
-
-
-### Forum, Help, etc
-
-If you hit an problems please let us know on the Camel Forums
-       <http://camel.apache.org/discussion-forums.html>
-
-Please help us make Apache Camel better - we appreciate any feedback you may
-have.  Enjoy!
-
-
-
-The Camel riders!

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
index 47c2abb..7513ba9 100644
--- 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
+++ 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
@@ -29,7 +29,6 @@ public final class MessageConsumerClient {
 
     private MessageConsumerClient() {
     }
-        
 
     public static void main(String[] args) throws Exception {
 
@@ -46,13 +45,13 @@ public final class MessageConsumerClient {
 
                 log.info("About to start route: Kafka Server -> Log ");
 
-                from("kafka:{{kafka.host}}:{{kafka.port}}?" 
-                        + "topic={{consumer.topic}}&" 
-                        + "maxPollRecords={{consumer.maxPollRecords}}&" 
-                        + "consumersCount={{consumer.consumersCount}}&" 
-                        + "seekToBeginning={{consumer.seekToBeginning}}&" 
-                        + "groupId={{consumer.group}}")
-                        .routeId("FromKafka").log("${body}");
+                
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+                        + "&maxPollRecords={{consumer.maxPollRecords}}"
+                        + "&consumersCount={{consumer.consumersCount}}"
+                        + "&seekToBeginning={{consumer.seekToBeginning}}"
+                        + "&groupId={{consumer.group}}")
+                        .routeId("FromKafka")
+                    .log("${body}");
             }
         });
         camelContext.start();

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
index ee5bd9d..dac7b36 100644
--- 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
+++ 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java
@@ -51,27 +51,20 @@ public final class MessagePublisherClient {
                 pc.setLocation("classpath:application.properties");
 
                 from("direct:kafkaStart").routeId("DirectToKafka")
-                        
.to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}").log("${headers}");
 // Topic
-                                                                               
                                 // and
-                                                                               
                                 // offset
-                                                                               
                                 // of
-                                                                               
                                 // the
-                                                                               
                                 // record
-                                                                               
                                 // is
-                                                                               
                                 // returned.
+                    
.to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}").log("${headers}");
 
                 // Topic can be set in header as well.
 
-                
from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic").to("kafka:{{kafka.host}}:{{kafka.port}}")
-                        .log("${headers}"); // Topic and offset of the record 
is
-                                            // returned.
+                from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic")
+                    .to("kafka:dummy?brokers={{kafka.host}}:{{kafka.port}}")
+                    .log("${headers}");
 
                 // Use custom partitioner based on the key.
 
                 
from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner")
-                        
.to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}&partitioner={{producer.partitioner}}")
-                        .log("${headers}"); // Use custom partitioner based on
-                                            // the key.
+                        
.to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}&partitioner={{producer.partitioner}}")
+                        .log("${headers}");
+
 
                 // Takes input from the command line.
 
@@ -101,14 +94,12 @@ public final class MessagePublisherClient {
 
         testKafkaMessage = "PART 0 :  " + testKafkaMessage;
         Map<String, Object> newHeader = new HashMap<String, Object>();
-        newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition
-                                                    // 0
+        newHeader.put(KafkaConstants.KEY, "AB"); // This should go to 
partition 0
 
         
producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", 
testKafkaMessage, newHeader);
 
         testKafkaMessage = "PART 1 :  " + testKafkaMessage;
-        newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to 
partition
-                                                    // 1
+        newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to 
partition 1
 
         
producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", 
testKafkaMessage, newHeader);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
index 13d57aa..0566bd1 100644
--- 
a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
+++ 
b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,55 +23,29 @@ import org.apache.kafka.common.Cluster;
 
 public class StringPartitioner implements Partitioner {
 
-    /**
-     * 
-     */
     public StringPartitioner() {
-        // TODO Auto-generated constructor stub
+        // noop
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.kafka.common.Configurable#configure(java.util.Map)
-     */
     @Override
     public void configure(Map<String, ?> configs) {
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * 
org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String,
-     * java.lang.Object, byte[], java.lang.Object, byte[],
-     * org.apache.kafka.common.Cluster)
-     */
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
-
         int partId = 0;
 
         if (key != null && key instanceof String) {
-
             String sKey = (String) key;
-
             int len = sKey.length();
 
             // This will return either 1 or zero
-
             partId = len % 2;
-
         }
 
         return partId;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.kafka.clients.producer.Partitioner#close()
-     */
     @Override
     public void close() {
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-kafka/src/main/resources/application.properties 
b/examples/camel-example-kafka/src/main/resources/application.properties
index 1728337..08738bd 100644
--- a/examples/camel-example-kafka/src/main/resources/application.properties
+++ b/examples/camel-example-kafka/src/main/resources/application.properties
@@ -20,31 +20,21 @@
 kafka.host=localhost
 kafka.port=9092
 
-
+kafka.serializerClass=kafka.serializer.StringEncoder
 
 # Producer properties
-
 producer.topic=TestLog
-
 producer.partitioner=org.apache.camel.example.kafka.StringPartitioner
 
-
-kafka.serializerClass=kafka.serializer.StringEncoder
-
 # Consumer properties 
 
 # One consumer can listen to more than one topics.[ TestLog,AccessLog ] 
-
 consumer.topic=TestLog
-
 consumer.group=kafkaGroup
-
 consumer.maxPollRecords=5000
 
 # No of consumers that connect to Kafka server
-
 consumer.consumersCount=1
 
 # Get records from the begining
-
 consumer.seekToBeginning=true

Reply via email to