Re: Horizontally Scaling Kafka Consumers
If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Re: Horizontally Scaling Kafka Consumers
Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Re: Horizontally Scaling Kafka Consumers
You're right Stevo, I should re-phrase to say that there can be no more _active_ consumers than there are partitions (within a single consumer group). I'm guessing that's what Nimi is alluding to asking, but perhaps he can elaborate on whether he's using consumer groups and/or whether the 100 partitions are all for a single topic, or multiple topics. On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote: Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Horizontally Scaling Kafka Consumers
Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Re: Why fetching meta-data for topic is done three times?
Hi Zakee, message.send.max.retries is 1 Regards, Madhukar On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi Zakee, Thanks for your reply. message.send.max.retries 3 retry.backoff.ms 100 topic.metadata.refresh.interval.ms 600*1000 This is my properties. Regards, Madhukar On Tue, Apr 28, 2015 at 3:26 AM, Zakee kzak...@netzero.net wrote: What values do you have for below properties? Or are these set to defaults? message.send.max.retries retry.backoff.ms topic.metadata.refresh.interval.ms Thanks Zakee On Apr 23, 2015, at 11:48 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi All, Once gone through code found that, While Producer starts it does three things: 1. Sends Meta-data request 2. Send message to broker(fetching broker list) 3. If number of message to be produce is grater than 0 then again tries to refresh metadata for outstanding produce requests. Each of the request takes configured timeout and go to next and finally once all is done then it will throw Exception(if 3 also fails). Here the problem is, if we set timeout as 1 sec then to throw an exception It takes 3 sec, so user request will be hanged up till 3 sec, that is comparatively high for response time and if all threads will be blocked due to producer send then whole application will be blocked for 3 sec. So we want to reduce this time to 1 sec. in overall to throw Exception. What is the possible way to do this? Thanks Madhukar On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi All, I came across a problem, If we use broker IP which is not reachable or out of network. Then it takes more than configured time(request.timeout.ms ). After checking the log got to know that it is trying to fetch topic meta-data three times by changing correlation id. Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec to throw Exception. I am using Kafka0.8.1.1 with patch https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch I have attached the log. Please check this and clarify why it is behaving like this. Whether it is by design or have to set some other property also. Regards Madhukar Want to place your ad here? Advertise on United Online http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
Re: zookeeper restart fatal error
Unfortunately sounds like a Zookeeper data corruption issue on the node in question: https://issues.apache.org/jira/browse/ZOOKEEPER-1546 The fix from the Jira is to clean out the Zookeeper data on the affected node (if that's possible) On 28 April 2015 at 16:59, Emley, Andrew andrew.em...@eon.com wrote: Hi I have had zk and kafka(2_8.0-0.8.1) set up nicely running for a week or so, I decided to stop the zk and the kafka brokers and re-start them, since stopping zk I can't start it again!! It gives me this fatal exception that is related to one of my test topics multinode1partition4reptopic!? Can anyone give me any pointers on how to resolve? Many thanks andy [2015-04-28 16:31:05,282] ERROR Failed to increment parent cversion for: /consumers/console-consumer-22432/offsets/multinode1partition4reptopic (org.apache.zookeeper.server.persistence.FileTxnSnapLog) org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/console-consumer-22432/offsets/multinode1partition4reptopic at org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218) at org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222) at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150) at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222) at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239) at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366) at org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160) at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110) at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85) at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51) at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108) at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76) [2015-04-28 16:31:05,289] FATAL Unexpected exception, exiting abnormally (org.apache.zookeeper.server.ZooKeeperServerMain) java.io.IOException: Failed to process transaction type: 2 error: KeeperErrorCode = NoNode for /consumers/console-consumer-22432/offsets/multinode1partition4reptopic at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152) at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222) at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239) at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366) at org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160) at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110) at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85) at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51) at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108) at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Re: Kafka client - 0.9
In current high-level consumer, you can still manually control when you commit offsets (see this blog for details: http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/ ) While you can't explicitly roll-back a commit, you can simply avoid committing when you have an exception (its a tiny bit more complex than that - because the iterator stores a buffer and location in the buffer... so you need to maintain your own collection of events to retry this is also explained in the blog above). Hope this helps. Gwen On Wed, Apr 29, 2015 at 9:50 AM, Bharath Srinivasan bharath...@gmail.com wrote: Any pointers on this feature? Thanks. On Thu, Apr 23, 2015 at 9:57 PM, Bharath Srinivasan bharath...@gmail.com wrote: Thanks Gwen. I'm specifically looking for the consumer rewrite API ( org.apache.kafka.clients.consumer.KafkaConsumer). Based on the wiki, this feature is available only in 0.9. The specific use case is that, I wanted to use the high level consumer but with the ability to rollback the offset in case of any exceptions. Based on the documentation, it seems like the current high level consumer API does not seem to be supporting it, atleast not in a straight forward fashion. Appreciate any alternate solutions. On Thu, Apr 23, 2015 at 8:08 PM, Gwen Shapira gshap...@cloudera.com wrote: We don't normally plan dates for releases, when we are done with features we want in the next release and happy with quality, we release. Many Apache communities are like that. If you need firmer roadmaps and specific release dates, there are few vendors selling Kafka distributions and support. Are there any specific features you are waiting for? Gwen On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan bharath...@gmail.com wrote: Hi, I'm looking for the 0.9 client release plan. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Is there a planned date for this release? Thanks, Bharath
Re: MultiThreaded HLConsumer Exits before events are all consumed
Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public
Consuming keyed messages with null value
I have an application producing Avro-encoded keyed messages (Martin Kleppmann's new Bottled Water project). It encodes a delete as a keyed message with an id as a key and a null payload. I have log compaction turned on. The Avro console consumer correctly displays this as null in my terminal, but when I try to consume it using the high-level consumer in Java, the message is never consumed. Subsequent non-null messages that were produced after that null also aren't consumed. Do I need to do something in order to have the iterator's hasNext() method (my code is pretty much exactly what appears in https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example ) not treat a null value as not being a valid message to consume or something? Or am I misunderstanding what's going on and need to do something different? Thanks!
Horizontally Scaling Kafka Consumers
Hi, I was wondering what options there are/what other people are doing for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what can I do? So far I've thought of just simply tracking consumer membership somehow (either through zookeeper's ephemeral nodes or maybe using gossip) on the consumers to achieve consensus on who consumes what. Another option would be having a router, possibly using something like nsq (I understand that they are similar pieces of software, but what we are going for is a persistent distributed queue (sharding) which is why I'm looking into Kafka)?
Re: Kafka 0.8.2 consumer offset checker throwing kafka.common.NotCoordinatorForConsumerException
Update here, we resolved this by deleting the kafka-data directory on that host ( which had file inconsistencies from 'fsck' run log last week in the kafka-data dir ) and restarting kafka. Note we also never reimaged the host ( that was another host, which we got confused over ). Thanks, Kartheek On Tue, Apr 28, 2015 at 6:36 PM, Kartheek Karra kka...@salesforce.com wrote: We recently upgraded kafka in our production environment cluster of 5 brokers from 0.8.0 to 0.8.2. Since then the consumerOffsetChecker script is unable to fetch offset due to kafka.common.NotCoordinatorForConsumerException. Note I'm able to run the 'consumerOffsetChecker' from an older version 0.8.0 successfully without any exceptions against the same upgraded cluster. Also we haven't migrated to kafka for offsets and are still using the default zookeeper. Another piece of information here is that kafka is always picking the same host as coordinator for fetching offset and we had to reimage that host just after the upgrade. Haven't been able to reproduce this in our test environments yet. Any clue what might be wrong here ? Let me know if more details are needed anywhere. Thanks, Kartheek
Kafka offset using kafka topic - not consuming messages
I am using Kafka 0.8.2 and I am using Kafka based storage for offset. Whenever I restart a consumer (high level consumer api) it is not consuming messages whichever were posted when the consumer was down. I am using the following consumer properties Properties props = new Properties(); props.put(zookeeper.connect, zooKeeper); props.put(group.id, consumerName); props.put(zookeeper.session.timeout.ms, 6000); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.enable, false); props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); props.put(auto.offset.reset, largest); My offset manager is here https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why the consumer is behaving weird. Please share any updates if you have. Thanks Regards,
Re: Kafka offset using kafka topic - not consuming messages
OK, so you turned off auto.offset.commit, and set the auto.offset.reset to largest. That means when you consume, 1. If you did not commit offsets manually, no offsets will be committed to Kafka. 2. If you do not have an offset stored in Kafka, you will start from the log end and ignore the existing messages in the topic. Another thing you want to check is that are you using the group Id all the time? Jiangjie (Becket) Qin On 4/29/15, 3:17 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: I am using Kafka 0.8.2 and I am using Kafka based storage for offset. Whenever I restart a consumer (high level consumer api) it is not consuming messages whichever were posted when the consumer was down. I am using the following consumer properties Properties props = new Properties(); props.put(zookeeper.connect, zooKeeper); props.put(group.id, consumerName); props.put(zookeeper.session.timeout.ms, 6000); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.enable, false); props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); props.put(auto.offset.reset, largest); My offset manager is here https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why the consumer is behaving weird. Please share any updates if you have. Thanks Regards,
MultiThreaded HLConsumer Exits before events are all consumed
Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); while (it.hasNext()){ System.out.println(Thread + m_threadNumber + : + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Shutting down Thread: + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println(Timed out waiting for consumer threads to shut down, exiting uncleanly); } } catch (InterruptedException e) { System.out.println(Interrupted during shutdown, exiting uncleanly); } } public void run(int a_numThreads) { MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId); props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); props.put(consumer.timeout.ms, -1); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0];
Re: Kafka offset using kafka topic - not consuming messages
Thank you, I am using the same groupId all the time. I printed OffsetsMessageFormatter output for the consumer, and the output is showing as [async_force_consumers,force_msgs,9]::OffsetAndMetadata[2,associated metadata,1430277791077] But If I restart the consumer, it starts consuming messages from offset 1 for partition 9. Even though I have stored the offset as 2. I am not sure what I am missing here. Thanks Regards, On Wed, Apr 29, 2015 at 5:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: OK, so you turned off auto.offset.commit, and set the auto.offset.reset to largest. That means when you consume, 1. If you did not commit offsets manually, no offsets will be committed to Kafka. 2. If you do not have an offset stored in Kafka, you will start from the log end and ignore the existing messages in the topic. Another thing you want to check is that are you using the group Id all the time? Jiangjie (Becket) Qin On 4/29/15, 3:17 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: I am using Kafka 0.8.2 and I am using Kafka based storage for offset. Whenever I restart a consumer (high level consumer api) it is not consuming messages whichever were posted when the consumer was down. I am using the following consumer properties Properties props = new Properties(); props.put(zookeeper.connect, zooKeeper); props.put(group.id, consumerName); props.put(zookeeper.session.timeout.ms, 6000); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.enable, false); props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); props.put(auto.offset.reset, largest); My offset manager is here https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why the consumer is behaving weird. Please share any updates if you have. Thanks Regards,
Re: New Producer API - batched sync mode support
I'm starting to think that the old adage If two people say you are drunk, lie down applies here :) Current API seems perfectly clear, useful and logical to everyone who wrote it... but we are getting multiple users asking for the old batch behavior back. One reason to get it back is to make upgrades easier - people won't need to rethink their existing logic if they get an API with the same behavior in the new producer. The other reason is what Ewen mentioned earlier - if everyone re-implements Joel's logic, we can provide something for that. How about getting the old batch send behavior back by adding a new API with: public void batchSend(ListProducerRecordK,V) With this implementation (mixes the old behavior with Joel's snippet): * send records one by one * flush * iterate on futures and get them * log a detailed message on each error * throw an exception if any send failed. It reproduces the old behavior - which apparently everyone really liked, and I don't think it is overly weird. It is very limited, but anyone who needs more control over his sends already have plenty of options. Thoughts? Gwen On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, The locking argument is correct for very small records ( 50 bytes), batching will help here because for small records locking becomes the big bottleneck. I think these use cases are rare but not unreasonable. Overall I'd emphasize that the new producer is way faster at virtually all use cases. If there is a use case where that isn't true, let's look at it in a data driven way by comparing the old producer to the new producer and looking for any areas where things got worse. I suspect the reducing allocations argument to be not a big thing. We do a number of small per-message allocations and it didn't seem to have much impact. I do think there are a couple of big producer memory optimizations we could do by reusing the arrays in the accumulator in the serialization of the request but I don't think this is one of them. I'd be skeptical of any api that was too weird--i.e. introduces a new way of partitioning, gives back errors on a per-partition rather than per message basis (given that partitioning is transparent this is really hard to think about), etc. Bad apis end up causing a ton of churn and just don't end up being a good long term commitment as we change how the underlying code works over time (i.e. we hyper optimize for something then have to maintain some super weird api as it becomes hyper unoptimized for the client over time). Roshan--Flush works as you would hope, it blocks on the completion of all outstanding requests. Calling get on the future for the request gives you the associated error code back. Flush doesn't throw any exceptions because waiting for requests to complete doesn't error, the individual requests fail or succeed which is always reported with each request. Ivan--The batches you send in the scala producer today actually aren't truely atomic, they just get sent in a single request. One tricky problem to solve when user's do batching is size limits on requests. This can be very hard to manage since predicting the serialized size of a bunch of java objects is not always obvious. This was repeatedly a problem before. -Jay On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov ibalas...@gmail.com wrote: I must agree with @Roshan – it's hard to imagine anything more intuitive and easy to use for atomic batching as old sync batch api. Also, it's fast. Coupled with a separate instance of producer per broker:port:topic:partition it works very well. I would be glad if it finds its way into new producer api. On a side-side-side note, could anyone confirm/deny if SimpleConsumer's fetchSize must be set at least as batch bytes (before or after compression), otherwise client risks not getting any messages?
Re: Unclaimed partitions
Hi, would anyone be able to help me with this issue? Thanks. - Dave On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote: 1. We’re using version 0.8.1.1. 2. No failures in the consumer logs 3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to the consumer group and what their offsets are. 8 of the 12 process each have been assigned two partitions and they’re keeping up with the topic. The other 4 do not get assigned partitions and no consumers in the group are consuming those 8 partitions. Thanks for your help, Dave On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Couple of questions: - What version of the consumer API are you using? - Are you seeing any rebalance failures in the consumer logs? - How do you determine that some partitions are unassigned? Just confirming that you have partitions that are not being consumed from as opposed to consumer threads that aren't assigned any partitions. Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 10:19 AM To: users@kafka.apache.org Subject: Re: Unclaimed partitions I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave
Re: Kafka 0.8.2 beta - release
It has already been released, including a minor revision to fix some critical bugs. The latest release is 0.8.2.1. The downloads page has links and release notes: http://kafka.apache.org/downloads.html On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: I see lot of interesting features with Kafka 0.8.2 beta. I am just wondering when that will be released. Is there any timeline for that? Thanks Regards, -- Thanks, Ewen
Re: Horizontally Scaling Kafka Consumers
You can do this with the existing Kafka Consumer https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106 and probably any other Kafka client too (maybe with minor/major rework to-do the offset management). The new consumer approach is more transparent on Subscribing To Specific Partitions https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234 . Here is a Docker file (** pull request pending **) for wrapping kafka consumers (doesn't have to be the go client, need to abstract that out some more after more testing) https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile Also a VM (** pull request pending **) to build container, push to local docker repository and launch on Apache Mesos https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant as working example how-to-do. All of this could be done without the Docker container and still work on Mesos ... or even without Mesos and on YARN. You might also want to checkout how Samza integrates with Execution Frameworks http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and built in YARN support. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com wrote: You're right Stevo, I should re-phrase to say that there can be no more _active_ consumers than there are partitions (within a single consumer group). I'm guessing that's what Nimi is alluding to asking, but perhaps he can elaborate on whether he's using consumer groups and/or whether the 100 partitions are all for a single topic, or multiple topics. On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote: Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Kafka 0.8.2 beta - release
I see lot of interesting features with Kafka 0.8.2 beta. I am just wondering when that will be released. Is there any timeline for that? Thanks Regards,
RE: Unclaimed partitions
Hey Dave, It's hard to say why this is happening without more information. Even if there are no errors in the log, is there anything to indicate that the rebalance process on those hosts even started? Does this happen occasionally or every time you start the consumer group? Can you paste the output of ConsumerOffsetChecker and describe topic? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Wednesday, April 29, 2015 6:46 PM To: users@kafka.apache.org; users@kafka.apache.org Subject: Re: Unclaimed partitions Hi, would anyone be able to help me with this issue? Thanks. - Dave On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote: 1. We’re using version 0.8.1.1. 2. No failures in the consumer logs 3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to the consumer group and what their offsets are. 8 of the 12 process each have been assigned two partitions and they’re keeping up with the topic. The other 4 do not get assigned partitions and no consumers in the group are consuming those 8 partitions. Thanks for your help, Dave On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Couple of questions: - What version of the consumer API are you using? - Are you seeing any rebalance failures in the consumer logs? - How do you determine that some partitions are unassigned? Just confirming that you have partitions that are not being consumed from as opposed to consumer threads that aren't assigned any partitions. Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 10:19 AM To: users@kafka.apache.org Subject: Re: Unclaimed partitions I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave
Re: MultiThreaded HLConsumer Exits before events are all consumed
The log suggests that the shutdown method were still called Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down Please ensure no consumer.shutdown(); and executor.shutdown(); are called during the course of your program On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote: Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] +
Re: MultiThreaded HLConsumer Exits before events are all consumed
example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); while (it.hasNext()){ System.out.println(Thread + m_threadNumber + : + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Shutting down Thread: + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println(Timed out waiting for consumer threads to shut down, exiting uncleanly); } } catch (InterruptedException e) { System.out.println(Interrupted during shutdown, exiting uncleanly); } } public void run(int a_numThreads) { MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId);