Re: Idempotent Producers and Exactly Once Consumers
Does a Kafka Streams consumer also have that same limitation of possible duplicates? Thanks, Chris On Fri, Sep 27, 2019 at 11:56 AM Matthias J. Sax wrote: > Enabling "read_committed" only ensures that a consumer does not return > uncommitted data. > > However, on failure, a consumer might still read committed messages > multiple times (if you commit offsets after processing). If you commit > offsets before you process messages, and a failure happens before > processing finishes, you may "loose" those messages, as they won't be > consumed again on restart. > > Hence, if you have a "consumer only" application, not much changed and > you still need to take care in your application code about potential > duplicate processing of records. > > -Matthias > > > On 9/27/19 7:34 AM, Alessandro Tagliapietra wrote: > > You can achieve exactly once on a consumer by enabling read committed and > > manually committing the offset as soon as you receive a message. That way > > you know that at next poll you won't get old message again. > > > > On Fri, Sep 27, 2019, 6:24 AM christopher palm wrote: > > > >> I had a similar question, and just watched the video on the > confluent.io > >> site about this. > >> From what I understand idempotence and transactions are there to solve > the > >> duplicate writes and exactly once processing, respectively. > >> > >> Is what you are stating below is that this only works if we produce > into a > >> kafka topic and consume from it via a kafka stream, but a regular > >> kafka consumer won't get the guarantee of exactly once processing? > >> > >> Thanks, > >> Chris > >> > >> > >> On Sat, Aug 31, 2019 at 12:29 AM Matthias J. Sax > > >> wrote: > >> > >>> Exactly-once on the producer will only ensure that no duplicate writes > >>> happen. If a downstream consumer fails, you might still read message > >>> multiple times for all cases (ie, without idempotence, with idempotence > >>> enabled, or if you use transactions). > >>> > >>> Note, that exactly-once is designed for a read-process-write pattern, > >>> but not for a write-read pattern. > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 8/30/19 1:00 PM, Peter Groesbeck wrote: > >>>> For a producer that emits messages to a single topic (i.e. no single > >>>> message is sent to multiple topics), will enabling idempotency but not > >>>> transactions provide exactly once guarantees for downstream consumers > >> of > >>>> said topic? > >>>> > >>>> Ordering is not important I just want to make sure consumers only > >>> consumer > >>>> messages sent once. > >>>> > >>> > >>> > >> > > > >
Re: Idempotent Producers and Exactly Once Consumers
I had a similar question, and just watched the video on the confluent.io site about this. >From what I understand idempotence and transactions are there to solve the duplicate writes and exactly once processing, respectively. Is what you are stating below is that this only works if we produce into a kafka topic and consume from it via a kafka stream, but a regular kafka consumer won't get the guarantee of exactly once processing? Thanks, Chris On Sat, Aug 31, 2019 at 12:29 AM Matthias J. Sax wrote: > Exactly-once on the producer will only ensure that no duplicate writes > happen. If a downstream consumer fails, you might still read message > multiple times for all cases (ie, without idempotence, with idempotence > enabled, or if you use transactions). > > Note, that exactly-once is designed for a read-process-write pattern, > but not for a write-read pattern. > > -Matthias > > > > On 8/30/19 1:00 PM, Peter Groesbeck wrote: > > For a producer that emits messages to a single topic (i.e. no single > > message is sent to multiple topics), will enabling idempotency but not > > transactions provide exactly once guarantees for downstream consumers of > > said topic? > > > > Ordering is not important I just want to make sure consumers only > consumer > > messages sent once. > > > >
Re: KafkaProducer Retries in .9.0.1
Hi Nicolas, I was able to get this working by shutting down one of the broker nodes, at that point you can see the producer retries and then deliver the message. Thanks, Chris On Wed, Apr 20, 2016 at 3:00 PM, Nicolas Phung wrote: > Hello, > > Have you solved this ? I'm encountering the same issue with the new > Producer on 0.9.0.1 client with a 0.9.0.1 Kafka broker. We tried the same > various breakdown (kafka(s), zookeeper) with 0.8.2.2 client and Kafka > broker 0.8.2.2 and the retries work as expected on the older version. I'm > going to take a look if someone else has filed a related issue about it. > > Regards, > Nicolas PHUNG > > On Thu, Apr 7, 2016 at 5:15 AM, christopher palm wrote: > > > Hi Thanks for the suggestion. > > I lowered the broker message.max.bytes to be smaller than my payload so > > that I now receive an > > org.apache.kafka.common.errors.RecordTooLargeException > > : > > > > I still don't see the retries happening, the default back off is 100ms, > and > > my producer loops for a few seconds, long enough to trigger the retry. > > > > Is there something else I need to set? > > > > I have tried this with a sync and async producer both with same results > > > > Thanks, > > > > Chris > > > > On Wed, Apr 6, 2016 at 12:01 AM, Manikumar Reddy < > > manikumar.re...@gmail.com> > > wrote: > > > > > Hi, > > > > > > Producer message size validation checks ("buffer.memory", > > > "max.request.size" ) happens before > > > batching and sending messages. Retry mechanism is applicable for > broker > > > side errors and network errors. > > > Try changing "message.max.bytes" broker config property for simulating > > > broker side error. > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 6, 2016 at 9:53 AM, christopher palm > > wrote: > > > > > > > Hi All, > > > > > > > > I am working with the KafkaProducer using the properties below, > > > > so that the producer keeps trying to send upon failure on Kafka > .9.0.1. > > > > I am forcing a failure by setting my buffersize smaller than my > > > > payload,which causes the expected exception below. > > > > > > > > I don't see the producer retry to send on receiving this failure. > > > > > > > > Am I missing something in the configuration to allow the producer to > > > retry > > > > on failed sends? > > > > > > > > Thanks, > > > > Chris > > > > > > > > .java.util.concurrent.ExecutionException: > > > > org.apache.kafka.common.errors.RecordTooLargeException: The message > is > > > 8027 > > > > bytes when serialized which is larger than the total memory buffer > you > > > have > > > > configured with the buffer.memory configuration. > > > > > > > > props.put("bootstrap.servers", bootStrapServers); > > > > > > > > props.put("acks", "all"); > > > > > > > > props.put("retries", 3);//Try for 3 strikes > > > > > > > > props.put("batch.size", batchSize);//Need to see if this number > should > > > > increase under load > > > > > > > > props.put("linger.ms", 1);//After 1 ms fire the batch even if the > > batch > > > > isn't full. > > > > > > > > props.put("buffer.memory", buffMemorySize); > > > > > > > > props.put("max.block.ms",500); > > > > > > > > props.put("max.in.flight.requests.per.connection", 1); > > > > > > > > props.put("key.serializer", > > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > > > > > props.put("value.serializer", > > > > "org.apache.kafka.common.serialization.ByteArraySerializer"); > > > > > > > > > >
Re: KafkaProducer Retries in .9.0.1
Hi Thanks for the suggestion. I lowered the broker message.max.bytes to be smaller than my payload so that I now receive an org.apache.kafka.common.errors.RecordTooLargeException : I still don't see the retries happening, the default back off is 100ms, and my producer loops for a few seconds, long enough to trigger the retry. Is there something else I need to set? I have tried this with a sync and async producer both with same results Thanks, Chris On Wed, Apr 6, 2016 at 12:01 AM, Manikumar Reddy wrote: > Hi, > > Producer message size validation checks ("buffer.memory", > "max.request.size" ) happens before > batching and sending messages. Retry mechanism is applicable for broker > side errors and network errors. > Try changing "message.max.bytes" broker config property for simulating > broker side error. > > > > > > > On Wed, Apr 6, 2016 at 9:53 AM, christopher palm wrote: > > > Hi All, > > > > I am working with the KafkaProducer using the properties below, > > so that the producer keeps trying to send upon failure on Kafka .9.0.1. > > I am forcing a failure by setting my buffersize smaller than my > > payload,which causes the expected exception below. > > > > I don't see the producer retry to send on receiving this failure. > > > > Am I missing something in the configuration to allow the producer to > retry > > on failed sends? > > > > Thanks, > > Chris > > > > .java.util.concurrent.ExecutionException: > > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 8027 > > bytes when serialized which is larger than the total memory buffer you > have > > configured with the buffer.memory configuration. > > > > props.put("bootstrap.servers", bootStrapServers); > > > > props.put("acks", "all"); > > > > props.put("retries", 3);//Try for 3 strikes > > > > props.put("batch.size", batchSize);//Need to see if this number should > > increase under load > > > > props.put("linger.ms", 1);//After 1 ms fire the batch even if the batch > > isn't full. > > > > props.put("buffer.memory", buffMemorySize); > > > > props.put("max.block.ms",500); > > > > props.put("max.in.flight.requests.per.connection", 1); > > > > props.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > props.put("value.serializer", > > "org.apache.kafka.common.serialization.ByteArraySerializer"); > > >
KafkaProducer Retries in .9.0.1
Hi All, I am working with the KafkaProducer using the properties below, so that the producer keeps trying to send upon failure on Kafka .9.0.1. I am forcing a failure by setting my buffersize smaller than my payload,which causes the expected exception below. I don't see the producer retry to send on receiving this failure. Am I missing something in the configuration to allow the producer to retry on failed sends? Thanks, Chris .java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 8027 bytes when serialized which is larger than the total memory buffer you have configured with the buffer.memory configuration. props.put("bootstrap.servers", bootStrapServers); props.put("acks", "all"); props.put("retries", 3);//Try for 3 strikes props.put("batch.size", batchSize);//Need to see if this number should increase under load props.put("linger.ms", 1);//After 1 ms fire the batch even if the batch isn't full. props.put("buffer.memory", buffMemorySize); props.put("max.block.ms",500); props.put("max.in.flight.requests.per.connection", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Re: Security with SSL and not Kerberos?
Hi Ismael, Ok I got the basic authentication/ACL authorization for SSL working with the principal Kafka.example.com If that principal isn't in the server.properties as a super user, I was seeing errors on broker startup. In order to add new principals, the server.properties has to be updated and that principal user added to the super user group? How to I run the kafka producer/consumer as a different principal other than Kafka.example.com? Thanks, Chris On Mon, Mar 21, 2016 at 6:54 PM, Ismael Juma wrote: > Hi Gopal, > > As you suspected, you have to set the appropriate ACLs for it to work. The > following will make the producer work: > > kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ > --add --allow-principal > "User:CN=kafka.example.com,OU=Client,O=Confluent,L=London,ST=London,C=GB" > \ > --producer --topic securing-kafka > > The following will make the consumer work: > > kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ > --add --allow-principal > "User:CN=kafka.example.com,OU=Client,O=Confluent,L=London,ST=London,C=GB" > \ > --consumer --topic securing-kafka --group securing-kafka-group > > Enabling the authorizer log is a good way to figure out the principal if > you don't know it. > > Hope this helps, > Ismael > > On Mon, Mar 21, 2016 at 10:27 PM, Raghavan, Gopal > > wrote: > > > >Hi Christopher, > > > > >On Mon, Mar 21, 2016 at 3:53 PM, christopher palm > > wrote: > > > > >> Does Kafka support SSL authentication and ACL authorization without > > >> Kerberos? > > >> > > > > >Yes. The following branch modifies the blog example slightly to only > allow > > >SSL authentication. > > > > >https://github.com/confluentinc/securing-kafka-blog/tree/ssl-only > > > > >If so, can different clients have their own SSL certificate on the same > > >> broker? > > >> > > > > >Yes. > > > > > > > > I tried the “ssl-only” branch but am getting the following error: > > > > [vagrant@kafka ~]$ kafka-console-producer --broker-list > > kafka.example.com:9093 --topic securing-kafka --producer.config > > /etc/kafka/producer_ssl.properties > > > > > > > > > > test > > > > > > > > > > [2016-03-21 22:08:46,744] WARN Error while fetching metadata with > > correlation id 0 : {securing-kafka=TOPIC_AUTHORIZATION_FAILED} > > (org.apache.kafka.clients.NetworkClient) > > > > > > > > > > [2016-03-21 22:08:46,748] ERROR Error when sending message to topic > > securing-kafka with key: null, value: 4 bytes with error: Not authorized > to > > access topics: [securing-kafka] > > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > > > > > > > > > > I did not set topic level ACL, since I do not know the Principal name to > > use for --allow-principal parameter of kafka-acls > > > > > > Any suggestions ? > > > > > > >In reading the following security article, it seems that Kerberos is an > > >> option but not required if SSL is used. > > >> > > > > >That's right. > > > > >Ismael > > >
Security with SSL and not Kerberos?
Hi All, Does Kafka support SSL authentication and ACL authorization without Kerberos? If so, can different clients have their own SSL certificate on the same broker? In reading the following security article, it seems that Kerberos is an option but not required if SSL is used. Thanks, Chris http://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption "Administrators can require client authentication using either Kerberos or Transport Layer Security (TLS) client certificates, so that Kafka brokers know who is making each request"
Re: MultiThreaded HLConsumer Exits before events are all consumed
What I found was 2 problems. 1. The producer wasn't passing in a partition key, so not all partitions were getting data. 2. After fixing the producer, I could see all threads getting data consistently then the shutdown method was clearly killing the threads. I have removed the shutdown,and with the producer changes sending in a key, this looks like it is running correctly now. Thanks! On Wed, Apr 29, 2015 at 10:59 PM, tao xiao wrote: > The log suggests that the shutdown method were still called > > Thread 0: 2015-04-29 > 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 > > Last Shutdown via example.shutDown called! > > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, > ZKConsumerConnector shutting down > > Please ensure no consumer.shutdown(); and executor.shutdown(); are called > during the course of your program > > On Thu, Apr 30, 2015 at 2:23 AM, christopher palm > wrote: > > > Commenting out Example shutdown did not seem to make a difference, I > added > > the print statement below to highlight the fact. > > > > The other threads still shut down, and only one thread lives on, > eventually > > that dies after a few minutes as well > > > > Could this be that the producer default partitioner is isn't balancing > data > > across all partitions? > > > > Thanks, > > Chris > > > > Thread 0: 2015-04-29 > > 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 > > > > Last Shutdown via example.shutDown called! > > > > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, > > ZKConsumerConnector shutting down > > > > 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka > > scheduler > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: > > [ConsumerFetcherManager-1430330968420] Stopping leader finder thread > > > > 15/04/29 13:09:38 INFO > consumer.ConsumerFetcherManager$LeaderFinderThread: > > -leader-finder-thread], Shutting down > > > > 15/04/29 13:09:38 INFO > consumer.ConsumerFetcherManager$LeaderFinderThread: > > -leader-finder-thread], Stopped > > > > 15/04/29 13:09:38 INFO > consumer.ConsumerFetcherManager$LeaderFinderThread: > > -leader-finder-thread], Shutdown completed > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: > > [ConsumerFetcherManager-1430330968420] Stopping all fetchers > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: > > [ConsumerFetcherThread-consumergroup], Shutting down > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: > > [ConsumerFetcherThread-], Stopped > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: > > [ConsumerFetcherThread-], Shutdown completed > > > > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: > > [ConsumerFetcherManager-] All connections stopped > > > > 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event > > thread. > > > > Shutting down Thread: 2 > > > > Shutting down Thread: 1 > > > > Shutting down Thread: 3 > > > > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: > > [consumergroup], ZKConsumerConnector shut down completed > > > > Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail > > distance|-73.99021500035|40.6636611 > > > > 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: > > [consumergroup], stopping watcher executor thread for consumer > > consumergroup > > > > Thread 0: 2015-04-29 > > 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 > > > > On Wed, Apr 29, 2015 at 10:11 AM, tao xiao wrote: > > > > > example.shutdown(); in ConsumerGroupExample closes all consumer > > connections > > > to Kafka. remove this line the consumer threads will run forever > > > > > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm > > > wrote: > > > > > > > Hi All, > > > > > > > > I am trying to get a multi threaded HL consumer working against a 2 > > > broker > > > > Kafka cluster with a 4 partition 2 replica topic. > > > > > > > > The consumer code is set to run with 4 threads, one for each > partition. > > > > > > > > The producer code uses the default partitioner and loops indefinitely > > > > feeding events into the topic.(I excluded the while loop in the paste > > > > below) > > > > > > > > What I see is the threads eventually all exit, even t
Re: MultiThreaded HLConsumer Exits before events are all consumed
Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao wrote: > example.shutdown(); in ConsumerGroupExample closes all consumer connections > to Kafka. remove this line the consumer threads will run forever > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm > wrote: > > > Hi All, > > > > I am trying to get a multi threaded HL consumer working against a 2 > broker > > Kafka cluster with a 4 partition 2 replica topic. > > > > The consumer code is set to run with 4 threads, one for each partition. > > > > The producer code uses the default partitioner and loops indefinitely > > feeding events into the topic.(I excluded the while loop in the paste > > below) > > > > What I see is the threads eventually all exit, even thought the producer > is > > still sending events into the topic. > > > > My understanding is that the consumer thread per partition is the correct > > setup. > > > > Any ideas why this code doesn't continue to consume events at they are > > pushed to the topic? > > > > I suspect I am configuring something wrong here, but am not sure what. > > > > Thanks, > > > > Chris > > > > > > *T**opic Configuration* > > > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: > > > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 > > > > *Producer Code:* > > > > Properties props = new Properties(); > > > > props.put("metadata.broker.list", args[0]); > > > > props.put("zk.connect", args[1]); > > > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > > > props.put("request.required.acks", "1"); > > > > String TOPIC = args[2]; > > > > ProducerConfig config = new ProducerConfig(props); > > > > Producer producer = new Producer( > > config); > > > > finalEvent = new Timestamp(new Date().getTime()) + "|" > > > > + truckIds[0] + "|" + driverIds[0] + "|" + > > events[random > > .nextInt(evtCnt)] > > > >
MultiThreaded HLConsumer Exits before events are all consumed
Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put("metadata.broker.list", args[0]); props.put("zk.connect", args[1]); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); Producer producer = new Producer( config); finalEvent = new Timestamp(new Date().getTime()) + "|" + truckIds[0] + "|" + driverIds[0] + "|" + events[random .nextInt(evtCnt)] + "|" + getLatLong(arrayroute17[i]); try { KeyedMessage data = new KeyedMessage(TOPIC, finalEvent); LOG.info("Sending Messge #: " + routeName[0] + ": " + i +", msg:" + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()){ System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Shutting down Thread: " + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); List> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("consumer.timeout.ms", "-1"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(