Updated Branches: refs/heads/master e5f44a11a -> b66b4a2b5
CAMEL-7092. Checkstyle fixes Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b66b4a2b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b66b4a2b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b66b4a2b Branch: refs/heads/master Commit: b66b4a2b5e1c1c6e4c66c91df9f5999b43b9bf81 Parents: e5f44a1 Author: Hadrian Zbarcea <hzbar...@gmail.com> Authored: Fri Feb 7 21:17:46 2014 -0500 Committer: Hadrian Zbarcea <hzbar...@gmail.com> Committed: Fri Feb 7 21:17:46 2014 -0500 ---------------------------------------------------------------------- components/camel-kafka/pom.xml | 1 + .../camel/component/kafka/KafkaConstants.java | 8 +++-- .../camel/component/kafka/KafkaConsumer.java | 32 +++++++++++-------- .../camel/component/kafka/KafkaEndpoint.java | 23 +++++++------- .../camel/component/kafka/KafkaProducer.java | 19 ++++++----- .../component/kafka/KafkaComponentTest.java | 16 ++++++++++ .../camel/component/kafka/KafkaConsumerIT.java | 26 ++++++++++++--- .../component/kafka/KafkaConsumerTest.java | 16 ++++++++++ .../component/kafka/KafkaEndpointTest.java | 23 ++++++++++++-- .../camel/component/kafka/KafkaProducerIT.java | 33 +++++++++++++++----- .../component/kafka/KafkaProducerTest.java | 21 +++++++++++-- .../component/kafka/SimplePartitioner.java | 18 +++++++++++ 12 files changed, 185 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml index f469b30..39f524f 100644 --- a/components/camel-kafka/pom.xml +++ b/components/camel-kafka/pom.xml @@ -64,6 +64,7 @@ <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index b7f6bdf..81a33f7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -19,7 +19,7 @@ package org.apache.camel.component.kafka; /** * @author Stephen Samuel */ -public class KafkaConstants { +public final class KafkaConstants { public static final String DEFAULT_GROUP = "group1"; @@ -27,4 +27,8 @@ public class KafkaConstants { public static final String PARTITION = "kafka.EXCHANGE_NAME"; public static final String KEY = "kafka.CONTENT_TYPE"; public static final String TOPIC = "kafka.TOPIC"; -} \ No newline at end of file + + private KafkaConstants() { + // Utility class + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index b4c605a..b2c132b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -22,41 +22,45 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; /** * @author Stephen Samuel */ public class KafkaConsumer extends DefaultConsumer { - private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); + protected ExecutorService executor; private final KafkaEndpoint endpoint; private final Processor processor; - ConsumerConnector consumer; - ExecutorService executor; + private ConsumerConnector consumer; public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; - if (endpoint.getZookeeperHost() == null) + if (endpoint.getZookeeperHost() == null) { throw new IllegalArgumentException("zookeeper host must be specified"); - if (endpoint.getZookeeperPort() == 0) + } + if (endpoint.getZookeeperPort() == 0) { throw new IllegalArgumentException("zookeeper port must be specified"); - if (endpoint.getGroupId() == null) + } + if (endpoint.getGroupId() == null) { throw new IllegalArgumentException("groupId must not be null"); + } } Properties getProps() { @@ -92,10 +96,12 @@ public class KafkaConsumer extends DefaultConsumer { super.doStop(); log.info("Stopping Kafka consumer"); - if (consumer != null) + if (consumer != null) { consumer.shutdown(); - if (executor != null) + } + if (executor != null) { executor.shutdown(); + } executor = null; } http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 6ad9887..5552893 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -20,6 +20,8 @@ import java.net.URISyntaxException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import kafka.message.MessageAndMetadata; + import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -29,7 +31,6 @@ import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.DefaultMessage; -import kafka.message.MessageAndMetadata; /** * @author Stephen Samuel @@ -44,6 +45,16 @@ public class KafkaEndpoint extends DefaultEndpoint { private String partitioner; private String topic; + public KafkaEndpoint() { + } + + public KafkaEndpoint(String endpointUri, + String remaining, + KafkaComponent component) throws URISyntaxException { + super(endpointUri, component); + this.brokers = remaining.split("\\?")[0]; + } + public String getZookeeperHost() { return zookeeperHost; } @@ -96,16 +107,6 @@ public class KafkaEndpoint extends DefaultEndpoint { this.consumerStreams = consumerStreams; } - public KafkaEndpoint() { - } - - public KafkaEndpoint(String endpointUri, - String remaining, - KafkaComponent component) throws URISyntaxException { - super(endpointUri, component); - this.brokers = remaining.split("\\?")[0]; - } - public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index d930313..cfcc0a9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -18,21 +18,22 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; /** * @author Stephen Samuel */ public class KafkaProducer extends DefaultProducer { + protected Producer<String, String> producer; private final KafkaEndpoint endpoint; - Producer<String, String> producer; public KafkaProducer(KafkaEndpoint endpoint) throws ClassNotFoundException, IllegalAccessException, InstantiationException { @@ -42,8 +43,9 @@ public class KafkaProducer extends DefaultProducer { @Override protected void doStop() throws Exception { - if (producer != null) + if (producer != null) { producer.close(); + } } Properties getProps() { @@ -66,12 +68,13 @@ public class KafkaProducer extends DefaultProducer { public void process(Exchange exchange) throws CamelException { Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); - if (partitionKey == null) + if (partitionKey == null) { throw new CamelException("No partition key set"); + } String msg = exchange.getIn().getBody(String.class); - KeyedMessage<String, String> data = - new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg); + KeyedMessage<String, String> data = new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg); producer.send(data); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index b7fe4eb..466ddf5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java index 99b5c91..cf862af 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java @@ -1,8 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.io.IOException; import java.util.Properties; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; @@ -12,9 +32,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; /** * @author Stephen Samuel @@ -27,8 +44,7 @@ public class KafkaConsumerIT extends CamelTestSupport { public static final String TOPIC = "test"; - @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + - "&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1") + @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1") private Endpoint from; @EndpointInject(uri = "mock:result") http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 8aa756f..8fe11bc 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.util.concurrent.ThreadPoolExecutor; http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java index 767d4d5..bda95a1 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -1,16 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.net.URISyntaxException; import java.util.concurrent.ThreadPoolExecutor; +import kafka.message.MessageAndMetadata; + import org.apache.camel.Exchange; import org.junit.Test; -import kafka.message.MessageAndMetadata; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + /** * @author Stephen Samuel */ @@ -42,4 +59,6 @@ public class KafkaEndpointTest { KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent()); assertTrue(endpoint.isSingleton()); } + } + http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java index 2e98d9c..c30dca2 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.io.IOException; @@ -9,6 +25,11 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.Produce; @@ -19,10 +40,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; /** * @author Stephen Samuel @@ -35,12 +52,11 @@ public class KafkaProducerIT extends CamelTestSupport { public static final String TOPIC = "test"; - @EndpointInject(uri = - "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner") + @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner") private Endpoint to; @Produce(uri = "direct:start") - protected ProducerTemplate template; + private ProducerTemplate template; private ConsumerConnector kafkaConsumer; @@ -101,8 +117,9 @@ public class KafkaProducerIT extends CamelTestSupport { } for (int k = 0; k < 20; k++) { - if (messages.size() == 10) + if (messages.size() == 10) { return; + } Thread.sleep(200); } http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 6fe7010..ba29c63 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -1,8 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import java.net.URISyntaxException; import java.util.Properties; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; + import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -12,8 +31,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mockito; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/camel/blob/b66b4a2b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java index d0eb738..a06b889 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kafka; import kafka.producer.Partitioner; @@ -21,4 +37,6 @@ public class SimplePartitioner implements Partitioner<String> { public int partition(String key, int numPartitions) { return key.hashCode() % numPartitions; } + } +