CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bcb4ed25 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bcb4ed25 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bcb4ed25 Branch: refs/heads/master Commit: bcb4ed25b5dc943dac09cc0b79436cadf4fae65e Parents: d3b38a0 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Feb 15 11:23:28 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 15 11:23:28 2017 +0100 ---------------------------------------------------------------------- examples/camel-example-kafka/README.adoc | 75 +++++++++++++++++++ examples/camel-example-kafka/README.md | 77 -------------------- .../example/kafka/MessageConsumerClient.java | 15 ++-- .../example/kafka/MessagePublisherClient.java | 27 +++---- .../camel/example/kafka/StringPartitioner.java | 34 +-------- .../src/main/resources/application.properties | 12 +-- 6 files changed, 96 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.adoc ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/README.adoc b/examples/camel-example-kafka/README.adoc new file mode 100644 index 0000000..83e79d8 --- /dev/null +++ b/examples/camel-example-kafka/README.adoc @@ -0,0 +1,75 @@ +# Camel Kafka example + +### Introduction + +An example which shows how to integrate Camel with Kakfa. + +This project consists of the following examples: + + 1. Send messages continuously by typing on the command line. + 2. Example of partitioner for a given producer. + 3. Topic is sent in the header as well as in the URL. + + +### Preparing Kafka + +This example requires that Kafka Server is up and running. + +You will need to create following topics before you run the examples. + +On windows run + + kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog + + kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog + +On linux run + + kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog + + kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog + + +### Build + +You will need to compile this example first: + + mvn compile + +### Run + +Run the consumer first in separate shell + + + mvn compile exec:java -Pkafka-consumer + + +Run the message producer in the seperate shell + + + mvn compile exec:java -Pkafka-producer + +Initially, some messages are sent programmatically. +On the command prompt, type the messages. Each line is sent as one message to kafka +Press `Ctrl-C` to exit. + + +### Configuration + +You can configure the details in the file: + `src/main/resources/application.properties` + +You can enable verbose logging by adjusting the `src/main/resources/log4j2.properties` + file as documented in the file. + + +### Forum, Help, etc + +If you hit an problems please let us know on the Camel Forums + <http://camel.apache.org/discussion-forums.html> + +Please help us make Apache Camel better - we appreciate any feedback you may +have. Enjoy! + + +The Camel riders! http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/README.md b/examples/camel-example-kafka/README.md deleted file mode 100644 index 06c7374..0000000 --- a/examples/camel-example-kafka/README.md +++ /dev/null @@ -1,77 +0,0 @@ -# Camel Kafka example - -### Introduction - -An example which shows how to integrate Camel with Kakfa. - -This project consists of the following examples: - - 1. Send messages continuously by typing on the command line. - 2. Example of partitioner for a given producer. - 3. Topic is sent in the header as well as in the URL. - - -### Preparing Kafka - -This example requires that Kafka Server is up and running. - -You will need to create following topics before you run the examples. - -On windows run - - kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog - - kafka-topics.bat --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog - -On linux run - - kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 2 --topic TestLog - - kafka-topics.sh --create --zookeeper <zookeeper host ip>:<port> --replication-factor 1 --partitions 1 --topic AccessLog - - -### Build - -You will need to compile this example first: - - mvn compile - -### Run - -1. Run the consumer first in separate shell - - mvn compile exec:java -Pkafka-consumer - - -2. Run the message producer in the seperate shell - - mvn compile exec:java -Pkafka-producer - - Initially, some messages are sent programmatically. - - On the command prompt, type the messages. Each line is sent as one message to kafka - - Type Ctrl-C to exit. - - - -### Configuration - -You can configure the details in the file: - `src/main/resources/application.properties` - -You can enable verbose logging by adjusting the `src/main/resources/log4j.properties` - file as documented in the file. - - -### Forum, Help, etc - -If you hit an problems please let us know on the Camel Forums - <http://camel.apache.org/discussion-forums.html> - -Please help us make Apache Camel better - we appreciate any feedback you may -have. Enjoy! - - - -The Camel riders! http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java index 47c2abb..7513ba9 100644 --- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java +++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java @@ -29,7 +29,6 @@ public final class MessageConsumerClient { private MessageConsumerClient() { } - public static void main(String[] args) throws Exception { @@ -46,13 +45,13 @@ public final class MessageConsumerClient { log.info("About to start route: Kafka Server -> Log "); - from("kafka:{{kafka.host}}:{{kafka.port}}?" - + "topic={{consumer.topic}}&" - + "maxPollRecords={{consumer.maxPollRecords}}&" - + "consumersCount={{consumer.consumersCount}}&" - + "seekToBeginning={{consumer.seekToBeginning}}&" - + "groupId={{consumer.group}}") - .routeId("FromKafka").log("${body}"); + from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + + "&maxPollRecords={{consumer.maxPollRecords}}" + + "&consumersCount={{consumer.consumersCount}}" + + "&seekToBeginning={{consumer.seekToBeginning}}" + + "&groupId={{consumer.group}}") + .routeId("FromKafka") + .log("${body}"); } }); camelContext.start(); http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java index ee5bd9d..dac7b36 100644 --- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java +++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java @@ -51,27 +51,20 @@ public final class MessagePublisherClient { pc.setLocation("classpath:application.properties"); from("direct:kafkaStart").routeId("DirectToKafka") - .to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}").log("${headers}"); // Topic - // and - // offset - // of - // the - // record - // is - // returned. + .to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}").log("${headers}"); // Topic can be set in header as well. - from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic").to("kafka:{{kafka.host}}:{{kafka.port}}") - .log("${headers}"); // Topic and offset of the record is - // returned. + from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic") + .to("kafka:dummy?brokers={{kafka.host}}:{{kafka.port}}") + .log("${headers}"); // Use custom partitioner based on the key. from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner") - .to("kafka:{{kafka.host}}:{{kafka.port}}?topic={{producer.topic}}&partitioner={{producer.partitioner}}") - .log("${headers}"); // Use custom partitioner based on - // the key. + .to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}&partitioner={{producer.partitioner}}") + .log("${headers}"); + // Takes input from the command line. @@ -101,14 +94,12 @@ public final class MessagePublisherClient { testKafkaMessage = "PART 0 : " + testKafkaMessage; Map<String, Object> newHeader = new HashMap<String, Object>(); - newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition - // 0 + newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition 0 producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", testKafkaMessage, newHeader); testKafkaMessage = "PART 1 : " + testKafkaMessage; - newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to partition - // 1 + newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to partition 1 producerTemplate.sendBodyAndHeaders("direct:kafkaStartWithPartitioner", testKafkaMessage, newHeader); http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java index 13d57aa..0566bd1 100644 --- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java +++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/StringPartitioner.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,55 +23,29 @@ import org.apache.kafka.common.Cluster; public class StringPartitioner implements Partitioner { - /** - * - */ public StringPartitioner() { - // TODO Auto-generated constructor stub + // noop } - /* - * (non-Javadoc) - * - * @see org.apache.kafka.common.Configurable#configure(java.util.Map) - */ @Override public void configure(Map<String, ?> configs) { } - /* - * (non-Javadoc) - * - * @see - * org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, - * java.lang.Object, byte[], java.lang.Object, byte[], - * org.apache.kafka.common.Cluster) - */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - int partId = 0; if (key != null && key instanceof String) { - String sKey = (String) key; - int len = sKey.length(); // This will return either 1 or zero - partId = len % 2; - } return partId; } - /* - * (non-Javadoc) - * - * @see org.apache.kafka.clients.producer.Partitioner#close() - */ @Override public void close() { } http://git-wip-us.apache.org/repos/asf/camel/blob/bcb4ed25/examples/camel-example-kafka/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/examples/camel-example-kafka/src/main/resources/application.properties b/examples/camel-example-kafka/src/main/resources/application.properties index 1728337..08738bd 100644 --- a/examples/camel-example-kafka/src/main/resources/application.properties +++ b/examples/camel-example-kafka/src/main/resources/application.properties @@ -20,31 +20,21 @@ kafka.host=localhost kafka.port=9092 - +kafka.serializerClass=kafka.serializer.StringEncoder # Producer properties - producer.topic=TestLog - producer.partitioner=org.apache.camel.example.kafka.StringPartitioner - -kafka.serializerClass=kafka.serializer.StringEncoder - # Consumer properties # One consumer can listen to more than one topics.[ TestLog,AccessLog ] - consumer.topic=TestLog - consumer.group=kafkaGroup - consumer.maxPollRecords=5000 # No of consumers that connect to Kafka server - consumer.consumersCount=1 # Get records from the begining - consumer.seekToBeginning=true