Re: kafka mirroring ...!
I think it will be very usefull if we can mirror to a different topic name on destination side. We have a use case to merge data from multiple colos to one central colo. SunilKalva On Mon, Mar 9, 2015 at 4:29 PM, tao xiao xiaotao...@gmail.com wrote: I don't think you can mirror messages to a different topic name in the current mirror maker implementation. Mirror maker sends the message to destination topic based on the topic name it reads from source On Mon, Mar 9, 2015 at 5:00 PM, sunil kalva sambarc...@gmail.com wrote: Can i configure different topic name in destination cluster, i mean can i have different topic names for source and destination cluster for mirroring. If yes how can i map source topic with destination topic name ? SunilKalva On Mon, Mar 9, 2015 at 6:41 AM, tao xiao xiaotao...@gmail.com wrote: Ctrl+c is clean shutdown. kill -9 is not On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote: What does a clean shutdown of the MM entail? So far I've just been using Ctrl + C to send an interrupt to kill it. Alex On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If auto.offset.reset is set to smallest, it does not mean the consumer will always consume from the smallest. It means that if no previous offset commit is found for this consumer group, then it will consume from the smallest. So for mirror maker, you probably want to always use the same consumer group id. This could be configured in the consumer config file you pass into mirror maker. Another thing about duplicate messages is that if mirror maker is shutdown cleanly, next time when you start it again with same consumer group id, there should be no duplicates. But if mirror maker shutdown uncleanly(e.g. By a kill -9), then next time it starts up you might still have duplicate messages after the last committed offsets. Jiangjie (Becket) Qin On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote: Qin Partition problem is solved by passing --new.producer true option in command line, but adding auto.offset.rese=smallest config, every time i restart the Mirror tool it copies from starting ends up having lot of duplicate messages in destination cluster. Could you please tell me how do i configure to make sure that destination cluster is always insync with source cluster. SunilKalva On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: For data not showing up, you need to make sure mirror maker consumer auto.offset.reset is set to smallest, otherwise when you run mirror maker for the first time, all the pre-existing messages won¹t be consumed. For partition sticking, can you verify if your messages are keyed messages or not? If they are not keyed messages, can you check if you are using old producer or new producer? For old producer, the default behavior is sticking to one partition for 10 min and then move to the next partition. So if you wait for more than 10 min, you should see messages in two different partitions. Jiangjie (Becket) Qin On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote: And i also observed ,all the data is moving to one partition in destination cluster though i have multiple partitions for that topic in source and destination clusters. SunilKalva On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com wrote: I ran kafka mirroring tool after producing data in source cluster, and this is not copied to destination cluster. If i produce data after running tool those data are copied to destination cluster. Am i missing something ? -- SunilKalva -- SunilKalva -- SunilKalva -- Regards, Tao -- SunilKalva -- Regards, Tao -- SunilKalva
Topics are not evenly distributed to streams using Range partition assignment
Hi, I created a message stream in my consumer using connector .createMessageStreamsByFilter(new Whitelist(mm-benchmark-test\\w*), 5); I have 5 topics in my cluster and each of the topic has only one partition. My understanding of wildcard stream is that multiple streams are shared between selected topics. In my case 5 streams should be shared between 5 different topics. But when I looked at the log it showed a different story 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test2 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test1 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test4 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test3 As indicated from the log only one stream was assigned to all topics. I just wanted to know if this is expected behavior? if yes how do we evenly distribute topics across different streams? by using roundrobin assigner? -- Regards, Tao
kafka topic information
I am wondering where does kafka cluster keep the topic metadata (name, partition, replication, etc)? How does a server recover the topic's metadata and messages after restart and what data will be lost? Thanks for anyone to answer my questions. best, Yuheng
Re: Does Kafka 0.8.2 producer has a lower throughput in sync-mode, comparing with 0.8.1.x?
If a send request in the middle of the list fails, will all send requests that follows it fail? Or only the messages that are put in the same batch by the underneath transportation layer fail? On Mon, Mar 9, 2015 at 1:31 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: 1. We can send list of messages and wait on the returned futures List responses = new ArrayList(); for(input: recordBatch) responses.add(producer.send(input)); for(response: responses) response.get 2. messages will be send in the submission order. On Mon, Mar 9, 2015 at 1:56 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: 1 . On Mon, Mar 9, 2015 at 1:03 PM, Yu Yang yuyan...@gmail.com wrote: The confluent blog http://blog.confluent.io/2014/12/02/whats-coming-in-apache-kafka-0-8-2/ mentions that the the batching is done whenever possible now. The sync producer, under load, can get performance as good as the async producer. Does it mean that kafka 0.8.2 guarantees that the sequence of broker-received-message is the same as the async-call sequence by the producer? If I have code as follows to send messages to the same topic partition: producer.send(msg1); prodcuer.send(msg2); producer.send(msg3); If all three calls succeed, is it possible that broker received msg3 before msg2? On Mon, Mar 9, 2015 at 12:17 AM, Yu Yang yuyan...@gmail.com wrote: Hi, Kafka 0.8.1.1 allows us to send a list of messages in sync mode: public void send(ListKeyedMessageK,V messages); I did not find a counter-part of this api in the new producer that is introduced in kafka 0.8.2. It seems that we can use the following method to do sync send in kafka 0.8.2: producer.send(new ProducerRecord(...))).get(); My understanding is that we can only send one message at a time in sync-mode. This will limit the throughput of kafka producer. Is there a way to send a batch of messages in sync mode using kafka 0.8.2 producer? Is there any study on the throughput of Kafka 0.8.2 producer, comparing with kafka 0.8.1.1? Thanks! Regards, Yu
Re: kafka topic information
Yuheng, kafka keeps cluster metadata in zookeeper along with topic metadata as well. You can use zookeeper-shell.sh or zkCli.sh to check zk nodes, /brokers/topics will give you the list of topics . -- Harsha On March 9, 2015 at 8:20:59 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: I am wondering where does kafka cluster keep the topic metadata (name, partition, replication, etc)? How does a server recover the topic's metadata and messages after restart and what data will be lost? Thanks for anyone to answer my questions. best, Yuheng
Re: kafka topic information
Harsha, Thanks for reply. So what if the zookeeper cluster fails? Will the topics information be lost? What fault-tolerant mechanism does zookeeper offer? best, On Mon, Mar 9, 2015 at 11:36 AM, Harsha ka...@harsha.io wrote: Yuheng, kafka keeps cluster metadata in zookeeper along with topic metadata as well. You can use zookeeper-shell.sh or zkCli.sh to check zk nodes, /brokers/topics will give you the list of topics . -- Harsha On March 9, 2015 at 8:20:59 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: I am wondering where does kafka cluster keep the topic metadata (name, partition, replication, etc)? How does a server recover the topic's metadata and messages after restart and what data will be lost? Thanks for anyone to answer my questions. best, Yuheng
Re: kafka topic information
In general users are expected to run zookeeper cluster of 3 or 5 nodes. Zookeeper requires quorum of servers running which means at least ceil(n/2) servers need to be up. For 3 zookeeper nodes there needs to be atleast 2 zk nodes up at any time , i.e your cluster can function fine incase of 1 machine failure and incase of 5 there should be at least 3 nodes to be up and running. For more info on zookeeper you can look under here http://zookeeper.apache.org/doc/r3.4.6/ http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html -- Harsha On March 9, 2015 at 8:39:00 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: Harsha, Thanks for reply. So what if the zookeeper cluster fails? Will the topics information be lost? What fault-tolerant mechanism does zookeeper offer? best, On Mon, Mar 9, 2015 at 11:36 AM, Harsha ka...@harsha.io wrote: Yuheng, kafka keeps cluster metadata in zookeeper along with topic metadata as well. You can use zookeeper-shell.sh or zkCli.sh to check zk nodes, /brokers/topics will give you the list of topics . -- Harsha On March 9, 2015 at 8:20:59 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: I am wondering where does kafka cluster keep the topic metadata (name, partition, replication, etc)? How does a server recover the topic's metadata and messages after restart and what data will be lost? Thanks for anyone to answer my questions. best, Yuheng
Re: [kafka-clients] Re: [VOTE] 0.8.2.1 Candidate 2
Any timeline on an official 0.8.2.1 release? Were there any issues found with rc2? Just checking in because we are anxious to update our brokers but waiting for the patch release. Thanks. On Thu, Mar 5, 2015 at 12:01 AM, Neha Narkhede n...@confluent.io wrote: +1. Verified quick start, unit tests. On Tue, Mar 3, 2015 at 12:09 PM, Joe Stein joe.st...@stealth.ly wrote: Ok, lets fix the transient test failure on trunk agreed not a blocker. +1 quick start passed, verified artifacts, updates in scala https://github.com/stealthly/scala-kafka/tree/0.8.2.1 and go https://github.com/stealthly/go_kafka_client/tree/0.8.2.1 look good ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Tue, Mar 3, 2015 at 12:30 PM, Jun Rao j...@confluent.io wrote: Hi, Joe, Yes, that unit test does have transient failures from time to time. The issue seems to be with the unit test itself and not the actual code. So, this is not a blocker for 0.8.2.1 release. I think we can just fix it in trunk. Thanks, Jun On Tue, Mar 3, 2015 at 9:08 AM, Joe Stein joe.st...@stealth.ly wrote: Jun, I have most everything looks good except I keep getting test failures from wget https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/kafka-0.8.2.1-src.tgz tar -xvf kafka-0.8.2.1-src.tgz cd kafka-0.8.2.1-src gradle ./gradlew test kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown FAILED org.scalatest.junit.JUnitTestFailedError: Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.api.ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown(ProducerFailureHandlingTest.scala:355) This happens to me all the time on a few different machines. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Mar 2, 2015 at 7:36 PM, Jun Rao j...@confluent.io wrote: +1 from me. Verified quickstart and unit tests. Thanks, Jun On Thu, Feb 26, 2015 at 2:59 PM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.1. This fixes 4 critical issue in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Mar 2, 3pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=bd1bfb63ec73c10d08432ac893a23f28281ea021 (git commit ee1267b127f3081db491fa1bf9a287084c324e36) /*** Thanks, Jun -- You received this message because you are subscribed to the Google Groups kafka-clients group. To unsubscribe from this group and stop receiving emails from it, send an email to kafka-clients+unsubscr...@googlegroups.com. To post to this group, send email to kafka-clie...@googlegroups.com. Visit this group at http://groups.google.com/group/kafka-clients. To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com?utm_medium=emailutm_source=footer . For more options, visit https://groups.google.com/d/optout. -- Thanks, Neha
Fwd: Verioning
I'm new to Kafka and I'm trying to understand the version semantics. We want to use Kafka w/ Spark but our version of Spark is tied to 0.8.0. We were wondering what guarantees are made about backwards compatbility across 0.8.x.x. At first glance, given the 3 digits used for versions, I figured 0.8.x would be a bugfix and fully version-compatible but I'm noticing newer versions released w/ 4 digits which leads me to beleave there are less guarantees between 0.8.0, 0.8.1.x and 0.8.2.x.
Re: Broker Exceptions
Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Thanks for you suggestions. It looks like the rebalance actually happened only once soon after I started with clean cluster and data was pushed, it didn’t happen again so far, and I see the partitions leader counts on brokers did not change since then. One of the brokers was constantly showing 0 for partition leader count. Is that normal? Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. Yes 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? ls /admin [delete_topics] ls /admin/preferred_replica_election Node does not exist: /admin/preferred_replica_election Thanks Zakee On Mar 7, 2015, at 10:49 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? Jiangjie (Becket) Qin On 3/7/15, 10:24 PM, Zakee kzak...@netzero.net wrote: I started with clean cluster and started to push data. It still does the rebalance at random durations even though the auto.leader.relabalance is set to false. Thanks Zakee On Mar 6, 2015, at 3:51 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Yes, the rebalance should not happen in that case. That is a little bit strange. Could you try to launch a clean Kafka cluster with auto.leader.election disabled and try push data? When leader migration occurs, NotLeaderForPartition exception is expected. Jiangjie (Becket) Qin On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote: Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote: Need to know if I should I be worried about this or ignore them. I see tons of these exceptions/warnings in the broker logs, not sure what causes them and what could be done to fix them. ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class
Re: [kafka-clients] Re: [VOTE] 0.8.2.1 Candidate 2
I was trying to see if kafka-2010 is a blocker to the 0.8.2.1 release. It doesn't seem to be since it won't affect the common usage when the controlled shutdown is enabled (by default). I will wrap up the 0.8.2.1 release. Thanks, Jun On Mon, Mar 9, 2015 at 8:25 AM, Solon Gordon so...@knewton.com wrote: Any timeline on an official 0.8.2.1 release? Were there any issues found with rc2? Just checking in because we are anxious to update our brokers but waiting for the patch release. Thanks. On Thu, Mar 5, 2015 at 12:01 AM, Neha Narkhede n...@confluent.io wrote: +1. Verified quick start, unit tests. On Tue, Mar 3, 2015 at 12:09 PM, Joe Stein joe.st...@stealth.ly wrote: Ok, lets fix the transient test failure on trunk agreed not a blocker. +1 quick start passed, verified artifacts, updates in scala https://github.com/stealthly/scala-kafka/tree/0.8.2.1 and go https://github.com/stealthly/go_kafka_client/tree/0.8.2.1 look good ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Tue, Mar 3, 2015 at 12:30 PM, Jun Rao j...@confluent.io wrote: Hi, Joe, Yes, that unit test does have transient failures from time to time. The issue seems to be with the unit test itself and not the actual code. So, this is not a blocker for 0.8.2.1 release. I think we can just fix it in trunk. Thanks, Jun On Tue, Mar 3, 2015 at 9:08 AM, Joe Stein joe.st...@stealth.ly wrote: Jun, I have most everything looks good except I keep getting test failures from wget https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/kafka-0.8.2.1-src.tgz tar -xvf kafka-0.8.2.1-src.tgz cd kafka-0.8.2.1-src gradle ./gradlew test kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown FAILED org.scalatest.junit.JUnitTestFailedError: Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.api.ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown(ProducerFailureHandlingTest.scala:355) This happens to me all the time on a few different machines. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Mar 2, 2015 at 7:36 PM, Jun Rao j...@confluent.io wrote: +1 from me. Verified quickstart and unit tests. Thanks, Jun On Thu, Feb 26, 2015 at 2:59 PM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.1. This fixes 4 critical issue in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Mar 2, 3pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=bd1bfb63ec73c10d08432ac893a23f28281ea021 (git commit ee1267b127f3081db491fa1bf9a287084c324e36) /*** Thanks, Jun -- You received this message because you are subscribed to the Google Groups kafka-clients group. To unsubscribe from this group and stop receiving emails from it, send an email to kafka-clients+unsubscr...@googlegroups.com. To post to this group, send email to kafka-clie...@googlegroups.com . Visit this group at http://groups.google.com/group/kafka-clients. To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com?utm_medium=emailutm_source=footer . For more options, visit https://groups.google.com/d/optout. -- Thanks, Neha
kafka Issue#2011 https://issues.apache.org/jira/browse/KAFKA-2011
Opened a kafka issue for rebalance happening with auto.rebalance set to false. https://issues.apache.org/jira/browse/KAFKA-2011 Logs for rebalance: [2015-03-07 16:52:48,969] INFO [Controller 2]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2015-03-07 16:52:48,969] INFO [Controller 2]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) … [2015-03-07 12:07:06,783] INFO [Controller 4]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 09:10:41,850] INFO [Controller 3]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 08:26:56,396] INFO [Controller 1]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) ... [2015-03-06 16:52:59,506] INFO [Controller 2]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Thanks Zakee Old School Yearbook Pics View Class Yearbooks Online Free. Search by School Year. Look Now! http://thirdpartyoffers.netzero.net/TGL3231/54fde54f50dc9654e386ast02vuc
Re: kafka mirroring ...!
Hi Sunilkalva, We are rewriting mirror maker in KAFKA-1997 with a handful of enhancement. With that new mirror maker, you will be able to mirror to a different topic by using the message handler. Jiangjie (Becket) Qin On 3/9/15, 4:41 AM, sunil kalva sambarc...@gmail.com wrote: I think it will be very usefull if we can mirror to a different topic name on destination side. We have a use case to merge data from multiple colos to one central colo. SunilKalva On Mon, Mar 9, 2015 at 4:29 PM, tao xiao xiaotao...@gmail.com wrote: I don't think you can mirror messages to a different topic name in the current mirror maker implementation. Mirror maker sends the message to destination topic based on the topic name it reads from source On Mon, Mar 9, 2015 at 5:00 PM, sunil kalva sambarc...@gmail.com wrote: Can i configure different topic name in destination cluster, i mean can i have different topic names for source and destination cluster for mirroring. If yes how can i map source topic with destination topic name ? SunilKalva On Mon, Mar 9, 2015 at 6:41 AM, tao xiao xiaotao...@gmail.com wrote: Ctrl+c is clean shutdown. kill -9 is not On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote: What does a clean shutdown of the MM entail? So far I've just been using Ctrl + C to send an interrupt to kill it. Alex On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If auto.offset.reset is set to smallest, it does not mean the consumer will always consume from the smallest. It means that if no previous offset commit is found for this consumer group, then it will consume from the smallest. So for mirror maker, you probably want to always use the same consumer group id. This could be configured in the consumer config file you pass into mirror maker. Another thing about duplicate messages is that if mirror maker is shutdown cleanly, next time when you start it again with same consumer group id, there should be no duplicates. But if mirror maker shutdown uncleanly(e.g. By a kill -9), then next time it starts up you might still have duplicate messages after the last committed offsets. Jiangjie (Becket) Qin On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote: Qin Partition problem is solved by passing --new.producer true option in command line, but adding auto.offset.rese=smallest config, every time i restart the Mirror tool it copies from starting ends up having lot of duplicate messages in destination cluster. Could you please tell me how do i configure to make sure that destination cluster is always insync with source cluster. SunilKalva On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: For data not showing up, you need to make sure mirror maker consumer auto.offset.reset is set to smallest, otherwise when you run mirror maker for the first time, all the pre-existing messages won¹t be consumed. For partition sticking, can you verify if your messages are keyed messages or not? If they are not keyed messages, can you check if you are using old producer or new producer? For old producer, the default behavior is sticking to one partition for 10 min and then move to the next partition. So if you wait for more than 10 min, you should see messages in two different partitions. Jiangjie (Becket) Qin On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote: And i also observed ,all the data is moving to one partition in destination cluster though i have multiple partitions for that topic in source and destination clusters. SunilKalva On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com wrote: I ran kafka mirroring tool after producing data in source cluster, and this is not copied to destination cluster. If i produce data after running tool those data are copied to destination cluster. Am i missing something ? -- SunilKalva -- SunilKalva -- SunilKalva -- Regards, Tao -- SunilKalva -- Regards, Tao -- SunilKalva
Re: kafka topic information
Thanks, got it! best, Yuheng On Mon, Mar 9, 2015 at 11:52 AM, Harsha ka...@harsha.io wrote: In general users are expected to run zookeeper cluster of 3 or 5 nodes. Zookeeper requires quorum of servers running which means at least ceil(n/2) servers need to be up. For 3 zookeeper nodes there needs to be atleast 2 zk nodes up at any time , i.e your cluster can function fine incase of 1 machine failure and incase of 5 there should be at least 3 nodes to be up and running. For more info on zookeeper you can look under here http://zookeeper.apache.org/doc/r3.4.6/ http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html -- Harsha On March 9, 2015 at 8:39:00 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: Harsha, Thanks for reply. So what if the zookeeper cluster fails? Will the topics information be lost? What fault-tolerant mechanism does zookeeper offer? best, On Mon, Mar 9, 2015 at 11:36 AM, Harsha ka...@harsha.io wrote: Yuheng, kafka keeps cluster metadata in zookeeper along with topic metadata as well. You can use zookeeper-shell.sh or zkCli.sh to check zk nodes, /brokers/topics will give you the list of topics . -- Harsha On March 9, 2015 at 8:20:59 AM, Yuheng Du (yuheng.du.h...@gmail.com) wrote: I am wondering where does kafka cluster keep the topic metadata (name, partition, replication, etc)? How does a server recover the topic's metadata and messages after restart and what data will be lost? Thanks for anyone to answer my questions. best, Yuheng
Fwd: Batching at the socket layer
I'm curious what type of batching Kafka producers do at the socket layer. For instance, if I have a partitioner that round robin's n messages to a different partition, am I guaranteed to get n different messages sent over the socket or is there some micro-batching going on underneath? I am trying to understand the semantics of the default partitioner and why it sticks to partitions for 10 minutes. If I were to lower that interval to 1sec, would I acheive better batching that I would if I was to completely round-robin each message to a different partition?
RE: Multiple consumer groups with same group id on a single topic
Hi, I have two separate consumer groups on different JVM processes, but both have the same group.id. They are high level consumer groups with each group containing 3 consumers. Only one group consumes at a given time - and I would like both groups, with the same id to share the load and common offsets. Jiangjie - are you saying I need to run both consumer groups on completely separate JVMs on different hosts? -Original Message- From: Mayuresh Gharat [mailto:gharatmayures...@gmail.com] Sent: 09 March 2015 17:07 To: users@kafka.apache.org Subject: Re: Multiple consumer groups with same group id on a single topic If you have 2 consumer groups, each group will read from all partitions automaticcally if you are using HighLevel consumer ( In your case it would be each consumer gets 2 partitons). You don't have to specify the partitions it should read from. Thanks, Mayuresh On Mon, Mar 9, 2015 at 9:59 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: HI Phill, Do you mean you are using 6 consumers with the same group id? Or you have 3 consumers using one group id, and another 3 using another different group id? For the example you mentioned, what you can do is to run several consumers on different physical machine with the same group id, they will balance the partition among themselves if you are using high level consumer. Jiangjie (Becket) Qin On 3/9/15, 1:27 AM, Phill Tomlinson philltomlin...@fico.com wrote: Hi, I have a topic with 6 partitions. I have two consumer groups with 3 consumers each, both with the same group.id. However only one group appears to consume from the topic. Is this expected behaviour? I would expect to be able to concurrently use two consumer groups on the same topic to provide better throughput across multiple nodes (if I wanted 100 partitions for a topic for example I don't want 100 threads running on a single 4 core processor for example). It would be ideal if each consumer group decided which partitions they would read from and auto-balance between them. Currently it just waits in the background and will only consume if the other group fails. Thanks, Phill This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately. -- -Regards, Mayuresh R. Gharat (862) 250-7125 This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Multiple consumer groups with same group id on a single topic
If you have 2 consumer groups, each group will read from all partitions automaticcally if you are using HighLevel consumer ( In your case it would be each consumer gets 2 partitons). You don't have to specify the partitions it should read from. Thanks, Mayuresh On Mon, Mar 9, 2015 at 9:59 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: HI Phill, Do you mean you are using 6 consumers with the same group id? Or you have 3 consumers using one group id, and another 3 using another different group id? For the example you mentioned, what you can do is to run several consumers on different physical machine with the same group id, they will balance the partition among themselves if you are using high level consumer. Jiangjie (Becket) Qin On 3/9/15, 1:27 AM, Phill Tomlinson philltomlin...@fico.com wrote: Hi, I have a topic with 6 partitions. I have two consumer groups with 3 consumers each, both with the same group.id. However only one group appears to consume from the topic. Is this expected behaviour? I would expect to be able to concurrently use two consumer groups on the same topic to provide better throughput across multiple nodes (if I wanted 100 partitions for a topic for example I don't want 100 threads running on a single 4 core processor for example). It would be ideal if each consumer group decided which partitions they would read from and auto-balance between them. Currently it just waits in the background and will only consume if the other group fails. Thanks, Phill This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately. -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: Broker Exceptions
Correction: Actually the rebalance happened quite until 24 hours after the start, and thats where below errors were found. Ideally rebalance should not have happened at all. Thanks Zakee On Mar 9, 2015, at 10:28 AM, Zakee kzak...@netzero.net wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Thanks for you suggestions. It looks like the rebalance actually happened only once soon after I started with clean cluster and data was pushed, it didn’t happen again so far, and I see the partitions leader counts on brokers did not change since then. One of the brokers was constantly showing 0 for partition leader count. Is that normal? Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. Yes 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? ls /admin [delete_topics] ls /admin/preferred_replica_election Node does not exist: /admin/preferred_replica_election Thanks Zakee On Mar 7, 2015, at 10:49 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? Jiangjie (Becket) Qin On 3/7/15, 10:24 PM, Zakee kzak...@netzero.net wrote: I started with clean cluster and started to push data. It still does the rebalance at random durations even though the auto.leader.relabalance is set to false. Thanks Zakee On Mar 6, 2015, at 3:51 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Yes, the rebalance should not happen in that case. That is a little bit strange. Could you try to launch a clean Kafka cluster with auto.leader.election disabled and try push data? When leader migration occurs, NotLeaderForPartition exception is expected. Jiangjie (Becket) Qin On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote: Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote: Need to know if I should I be worried about this or ignore them. I see tons of these exceptions/warnings in the broker logs, not sure what causes them and what could be done to fix them. ERROR
Group name while consuming in 0.8.2
Hi, How to specify group name when using kafka-console-consmer.sh in 0.8.2. Kafka 0.8.1 had --group option while running the above script. I need group name to run offset checker after running the consumer. Thanks, Tushar
Re: [VOTE] 0.8.2.1 Candidate 2
The following are the results of the votes. +1 binding = 3 votes +1 non-binding = 2 votes -1 = 0 votes 0 = 0 votes The vote passes. I will release artifacts to maven central, update the dist svn and download site. Will send out an announce after that. Thanks everyone that contributed to the work in 0.8.2.1! Jun On Thu, Feb 26, 2015 at 2:59 PM, Jun Rao j...@confluent.io wrote: This is the second candidate for release of Apache Kafka 0.8.2.1. This fixes 4 critical issue in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/RELEASE_NOTES.html *** Please download, test and vote by Monday, Mar 2, 3pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=bd1bfb63ec73c10d08432ac893a23f28281ea021 (git commit ee1267b127f3081db491fa1bf9a287084c324e36) /*** Thanks, Jun
Re: Multiple consumer groups with same group id on a single topic
HI Phill, Do you mean you are using 6 consumers with the same group id? Or you have 3 consumers using one group id, and another 3 using another different group id? For the example you mentioned, what you can do is to run several consumers on different physical machine with the same group id, they will balance the partition among themselves if you are using high level consumer. Jiangjie (Becket) Qin On 3/9/15, 1:27 AM, Phill Tomlinson philltomlin...@fico.com wrote: Hi, I have a topic with 6 partitions. I have two consumer groups with 3 consumers each, both with the same group.id. However only one group appears to consume from the topic. Is this expected behaviour? I would expect to be able to concurrently use two consumer groups on the same topic to provide better throughput across multiple nodes (if I wanted 100 partitions for a topic for example I don't want 100 threads running on a single 4 core processor for example). It would be ideal if each consumer group decided which partitions they would read from and auto-balance between them. Currently it just waits in the background and will only consume if the other group fails. Thanks, Phill This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Topics are not evenly distributed to streams using Range partition assignment
Hi Tao, That is expected behavior. You can use set partition.assignment.strategy=roundrobin in consumer config. It will take all the partitions from all topics and do a round robin assignment, whereas range only take partitions for each individual topic for assignment. Jiangjie (Becket) Qin On 3/9/15, 4:10 AM, tao xiao xiaotao...@gmail.com wrote: Hi, I created a message stream in my consumer using connector .createMessageStreamsByFilter(new Whitelist(mm-benchmark-test\\w*), 5); I have 5 topics in my cluster and each of the topic has only one partition. My understanding of wildcard stream is that multiple streams are shared between selected topics. In my case 5 streams should be shared between 5 different topics. But when I looked at the log it showed a different story 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test2 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test1 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test4 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test3 As indicated from the log only one stream was assigned to all topics. I just wanted to know if this is expected behavior? if yes how do we evenly distribute topics across different streams? by using roundrobin assigner? -- Regards, Tao
Re: Does Kafka 0.8.2 producer has a lower throughput in sync-mode, comparing with 0.8.1.x?
Hi Yang, In the code suggested by Manikumar, yes, it is possible message 3 still got sent even message 2 failed. There is no single line code for send a batch of message synchronously now, but after KAFKA-1660 is checked in, you may be able to achieve this by doing the following: Set a callback for each send, in the callback, if exception occurs, call producer.close(-1) - this will prevent further messages to be sent. Then you can the following code: For(record: records) Producer.send(record, callback) Producer.flush() Jiangjie (Becket) Qin On 3/9/15, 8:49 AM, Yu Yang yuyan...@gmail.com wrote: If a send request in the middle of the list fails, will all send requests that follows it fail? Or only the messages that are put in the same batch by the underneath transportation layer fail? On Mon, Mar 9, 2015 at 1:31 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: 1. We can send list of messages and wait on the returned futures List responses = new ArrayList(); for(input: recordBatch) responses.add(producer.send(input)); for(response: responses) response.get 2. messages will be send in the submission order. On Mon, Mar 9, 2015 at 1:56 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: 1 . On Mon, Mar 9, 2015 at 1:03 PM, Yu Yang yuyan...@gmail.com wrote: The confluent blog http://blog.confluent.io/2014/12/02/whats-coming-in-apache-kafka-0-8-2/ mentions that the the batching is done whenever possible now. The sync producer, under load, can get performance as good as the async producer. Does it mean that kafka 0.8.2 guarantees that the sequence of broker-received-message is the same as the async-call sequence by the producer? If I have code as follows to send messages to the same topic partition: producer.send(msg1); prodcuer.send(msg2); producer.send(msg3); If all three calls succeed, is it possible that broker received msg3 before msg2? On Mon, Mar 9, 2015 at 12:17 AM, Yu Yang yuyan...@gmail.com wrote: Hi, Kafka 0.8.1.1 allows us to send a list of messages in sync mode: public void send(ListKeyedMessageK,V messages); I did not find a counter-part of this api in the new producer that is introduced in kafka 0.8.2. It seems that we can use the following method to do sync send in kafka 0.8.2: producer.send(new ProducerRecord(...))).get(); My understanding is that we can only send one message at a time in sync-mode. This will limit the throughput of kafka producer. Is there a way to send a batch of messages in sync mode using kafka 0.8.2 producer? Is there any study on the throughput of Kafka 0.8.2 producer, comparing with kafka 0.8.1.1? Thanks! Regards, Yu
Re: Multiple consumer groups with same group id on a single topic
On Mon, Mar 9, 2015 at 10:38 AM, Phill Tomlinson philltomlin...@fico.com wrote: Hi, I have two separate consumer groups on different JVM processes, but both have the same group.id. You've said this twice, and I think it's creating some confusion, because the group.id is exactly what determines the members of a consumer group. How many consumer threads are you providing total? Could you provide code snippets to clarify how you're setting things up?
Kafka Mailing List for General Questions
Hi, I would like to subscribe to the Kafka mailing list for general questions. Please let me know what I need to do in order to submit questions to the Kafka general mailing list. Thanks. Regards, Mark Flores Project Manager, Enterprise Technology Direct206-576-2675 Email mark.flo...@expeditors.com [expd] Global Headquarters, Seattle 1015 Third Avenue, 12th Floor Seattle, WA 98104
Re: Kafka Mailing List for General Questions
Looks like you subscribed. Just start a new thread and ask away. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Mar 9, 2015 at 4:27 PM, Mark Flores mark.flo...@expeditors.com wrote: Hi, I would like to subscribe to the Kafka mailing list for general questions. Please let me know what I need to do in order to submit questions to the Kafka general mailing list. Thanks. Regards, *Mark Flores* Project Manager, Enterprise Technology *Direct*206-576-2675 *Email *mark.flo...@expeditors.com [image: expd] *Global Headquarters, Seattle* 1015 Third Avenue, 12th Floor Seattle, WA 98104
Re: integrate Camus and Hive?
If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
kafka log ERROR Closing socket for IP -- Connection reset by peer
I'm calling ConsumerConnector.shutdown to close a consumer connection and kafka's log reports an error? I don't see a similar error when using SimpleConsumer. Is there a way to close ConsumerConnector so that the errors aren't reported in the kafka log (this is making it very difficult to sift through the log a find real errors). Found this -- but I didn't see a fix https://github.com/claudemamo/kafka-web-console/issues/37 Here's what the kafka log produces: [2015-03-09 13:47:40,308] ERROR Closing socket for /172.18.251.1 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) Here's how I'm using a consumer connector (in scala) -- did I forget to close something? @Test def consumerCloseTest(): Unit = { val topic = UserEvent val consumerConfig = KafkaUtil.makeConsumerConfig(UserEventTest, testProperties) var consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) try { // Read one item val numThreads: Int = 1 val topicCountMap = Map[String, Int](topic - numThreads); val topicMessageStreams: scala.collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte = consumerConnector.createMessageStreams(topicCountMap); val streams = topicMessageStreams.get(topic); val stream = streams.get(0); val it = stream.iterator(); while (it.hasNext()) { logger.info(DONE) return } } finally { consumerConnector.shutdown } }
Re: Kafka Mailing List for General Questions
Hi Mark, You’ve already asked a question in the right place – sending email to users@kafka.apache.org is the right way. If it is a development question, you can send to d...@kakfa.apache.org. Jiangjie (Becket) Qin From: Mark Flores mark.flo...@expeditors.commailto:mark.flo...@expeditors.com Reply-To: users@kafka.apache.orgmailto:users@kafka.apache.org users@kafka.apache.orgmailto:users@kafka.apache.org Date: Monday, March 9, 2015 at 12:27 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Kafka Mailing List for General Questions Hi, I would like to subscribe to the Kafka mailing list for general questions. Please let me know what I need to do in order to submit questions to the Kafka general mailing list. Thanks. Regards, Mark Flores Project Manager, Enterprise Technology Direct206-576-2675 Email mark.flo...@expeditors.commailto:mark.flo...@expeditors.com [expd] Global Headquarters, Seattle 1015 Third Avenue, 12th Floor Seattle, WA 98104
Kafka Questions
One of our development teams is considering implementing a Kafka solution. If the development team were to assume implementing 6 separate regional Kafka clusters: *How could we implement global Pub/Sub between clusters? *Can we do bi-directional replication with MirrorMaker between each cluster? *How do we prevent a message published at one cluster being replicated to another cluster and into an infinite loop (assuming we use a MirrorMaker whitelist like Global.* at each cluster)? Regards, Mark Flores Project Manager, Enterprise Technology Direct206-576-2675 Email mark.flo...@expeditors.com [expd] Global Headquarters, Seattle 1015 Third Avenue, 12th Floor Seattle, WA 98104
Re: Broker Exceptions
Is there anything wrong with brokers around that time? E.g. Broker restart? The log you pasted are actually from replica fetchers. Could you paste the related logs in controller.log? Thanks. Jiangjie (Becket) Qin On 3/9/15, 10:32 AM, Zakee kzak...@netzero.net wrote: Correction: Actually the rebalance happened quite until 24 hours after the start, and thats where below errors were found. Ideally rebalance should not have happened at all. Thanks Zakee On Mar 9, 2015, at 10:28 AM, Zakee kzak...@netzero.net wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Thanks for you suggestions. It looks like the rebalance actually happened only once soon after I started with clean cluster and data was pushed, it didn’t happen again so far, and I see the partitions leader counts on brokers did not change since then. One of the brokers was constantly showing 0 for partition leader count. Is that normal? Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. Yes 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? ls /admin [delete_topics] ls /admin/preferred_replica_election Node does not exist: /admin/preferred_replica_election Thanks Zakee On Mar 7, 2015, at 10:49 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? Jiangjie (Becket) Qin On 3/7/15, 10:24 PM, Zakee kzak...@netzero.net wrote: I started with clean cluster and started to push data. It still does the rebalance at random durations even though the auto.leader.relabalance is set to false. Thanks Zakee On Mar 6, 2015, at 3:51 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Yes, the rebalance should not happen in that case. That is a little bit strange. Could you try to launch a clean Kafka cluster with auto.leader.election disabled and try push data? When leader migration occurs, NotLeaderForPartition exception is expected. Jiangjie (Becket) Qin On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote: Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee
Re: Is it actually a bad idea to set 'consumer.id' explicitly?
Hi Kevin, You can use partition.assignment.strategy=roundrobin. This will balance all the partition of all the topics across consumer thread. I think the rationale behind using default consumer id is that you will have better information to identify a consumer. But if you want to have some specific value in the consumer id, I think you can just do it. Jiangjie (Becket) Qin On 3/9/15, 11:40 AM, Kevin Scaldeferri ke...@scaldeferri.com wrote: https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consu mer/ConsumerConfig.scala#L101 suggests that 'consumer.id' should only be set explicitly for testing purposes. Is there a reason that it would be a bad idea to set it ourselves for production use? The reason I am asking is that it seems like the standard value, which starts with the hostname, produces somewhat sub-optimal distribution of partitions under the lexicographical sort. If the number of partitions is not an exact multiple of the number of consumers, the surplus or deficit tends to be concentrated on just one or two machines. We'd much rather if the extra partitions were evenly striped across our cluster. (Also, in addition to the above concern, we'd also find it useful in debugging situations if we included some application-specific values in the consumer ID beyond just hostname.) Do other people run into this? Are there problems with setting the consumer.id in order to affect the distribution of partitions? Thanks, -kevin
Re: Broker Exceptions
No broker restarts. Created a kafka issue: https://issues.apache.org/jira/browse/KAFKA-2011 https://issues.apache.org/jira/browse/KAFKA-2011 Logs for rebalance: [2015-03-07 16:52:48,969] INFO [Controller 2]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2015-03-07 16:52:48,969] INFO [Controller 2]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) … [2015-03-07 12:07:06,783] INFO [Controller 4]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 09:10:41,850] INFO [Controller 3]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 08:26:56,396] INFO [Controller 1]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) ... [2015-03-06 16:52:59,506] INFO [Controller 2]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Could you paste the related logs in controller.log? What specifically should I search for in the logs? Thanks, Zakee On Mar 9, 2015, at 11:35 AM, Jiangjie Qin j...@linkedin.com.INVALID mailto:j...@linkedin.com.INVALID wrote: Is there anything wrong with brokers around that time? E.g. Broker restart? The log you pasted are actually from replica fetchers. Could you paste the related logs in controller.log? Thanks. Jiangjie (Becket) Qin On 3/9/15, 10:32 AM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Correction: Actually the rebalance happened quite until 24 hours after the start, and thats where below errors were found. Ideally rebalance should not have happened at all. Thanks Zakee On Mar 9, 2015, at 10:28 AM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Thanks for you suggestions. It looks like the rebalance actually happened only once soon after I started with clean cluster and data was pushed, it didn’t happen again so far, and I see the partitions leader counts on brokers did not change since then. One of the brokers was constantly showing 0 for partition leader count. Is that normal? Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. Yes 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? ls /admin [delete_topics] ls /admin/preferred_replica_election Node does not exist: /admin/preferred_replica_election Thanks Zakee On Mar 7, 2015, at 10:49 PM, Jiangjie Qin j...@linkedin.com.INVALID mailto:j...@linkedin.com.INVALID wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. 2. In zookeeper path, can you verify
Re: Batching at the socket layer
The stickiness of partition only applies to old producer. In new producer we have the round robin for each message. The batching in new producer is per topic partition, the batch size it is controlled by both max batch size and linger time config. Jiangjie (Becket) Qin On 3/9/15, 10:10 AM, Corey Nolet cjno...@gmail.com wrote: I'm curious what type of batching Kafka producers do at the socket layer. For instance, if I have a partitioner that round robin's n messages to a different partition, am I guaranteed to get n different messages sent over the socket or is there some micro-batching going on underneath? I am trying to understand the semantics of the default partitioner and why it sticks to partitions for 10 minutes. If I were to lower that interval to 1sec, would I acheive better batching that I would if I was to completely round-robin each message to a different partition?
Re: Broker Exceptions
No broker restarts. Created a kafka issue: https://issues.apache.org/jira/browse/KAFKA-2011 https://issues.apache.org/jira/browse/KAFKA-2011 Logs for rebalance: [2015-03-07 16:52:48,969] INFO [Controller 2]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2015-03-07 16:52:48,969] INFO [Controller 2]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) … [2015-03-07 12:07:06,783] INFO [Controller 4]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 09:10:41,850] INFO [Controller 3]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) ... [2015-03-07 08:26:56,396] INFO [Controller 1]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) ... [2015-03-06 16:52:59,506] INFO [Controller 2]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Could you paste the related logs in controller.log? What specifically should I search for in the logs? Thanks, Kazim Zakee On Mar 9, 2015, at 11:35 AM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Is there anything wrong with brokers around that time? E.g. Broker restart? The log you pasted are actually from replica fetchers. Could you paste the related logs in controller.log? Thanks. Jiangjie (Becket) Qin On 3/9/15, 10:32 AM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Correction: Actually the rebalance happened quite until 24 hours after the start, and thats where below errors were found. Ideally rebalance should not have happened at all. Thanks Zakee On Mar 9, 2015, at 10:28 AM, Zakee kzak...@netzero.net wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Thanks for you suggestions. It looks like the rebalance actually happened only once soon after I started with clean cluster and data was pushed, it didn’t happen again so far, and I see the partitions leader counts on brokers did not change since then. One of the brokers was constantly showing 0 for partition leader count. Is that normal? Also, I still see lots of below errors (~69k) going on in the logs since the restart. Is there any other reason than rebalance for these errors? [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-11,7] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-2,25] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error for partition [Topic-2,21] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-1-5], Error for partition [Topic-22,9] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. Yes 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? ls /admin [delete_topics] ls /admin/preferred_replica_election Node does not exist: /admin/preferred_replica_election Thanks Zakee On Mar 7, 2015, at 10:49 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Hmm, that sounds like a bug. Can you paste the log of leader rebalance here? Some other things to check are: 1. The actual property name is auto.leader.rebalance.enable, not auto.leader.rebalance. You’ve probably known this, just to double confirm. 2. In zookeeper path, can you verify /admin/preferred_replica_election does not exist? Jiangjie (Becket) Qin On 3/7/15, 10:24
Re: Multiple consumer groups with same group id on a single topic
Yes, Kevin is right. It does not matter whether you run the consumer from the same JVM or not, as long as the consumers has same group id, they are in the same group. So in your case, you have 6 consumers in the same consumer group. Since you have 6 partitions in the topic, assuming you have only one consumer thread for each high level consumer, each high level consumer will consume from one partition. Jiangjie (Becket) Qin On 3/9/15, 11:13 AM, Kevin Scaldeferri ke...@scaldeferri.com wrote: On Mon, Mar 9, 2015 at 10:38 AM, Phill Tomlinson philltomlin...@fico.com wrote: Hi, I have two separate consumer groups on different JVM processes, but both have the same group.id. You've said this twice, and I think it's creating some confusion, because the group.id is exactly what determines the members of a consumer group. How many consumer threads are you providing total? Could you provide code snippets to clarify how you're setting things up?
Working DR patterns for Kafka
There are various prior questions including.. http://search-hadoop.com/m/4TaT4ts2oz1/disaster+recovery/v=threaded Is there a clear document on disaster recovery patterns for K and their respective trade offs. How are actual prod deployments dealing with this. For instance I want my topics replicated to another site/cluster and I want the offsets in the topic to match so that a consuming client can fail over. Doors anyone have something like this working really reliably in prod? I can tolerate the reliant being second behind the primary but not differing offsets. What are others managing to achieve with K?
Is it actually a bad idea to set 'consumer.id' explicitly?
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ConsumerConfig.scala#L101 suggests that 'consumer.id' should only be set explicitly for testing purposes. Is there a reason that it would be a bad idea to set it ourselves for production use? The reason I am asking is that it seems like the standard value, which starts with the hostname, produces somewhat sub-optimal distribution of partitions under the lexicographical sort. If the number of partitions is not an exact multiple of the number of consumers, the surplus or deficit tends to be concentrated on just one or two machines. We'd much rather if the extra partitions were evenly striped across our cluster. (Also, in addition to the above concern, we'd also find it useful in debugging situations if we included some application-specific values in the consumer ID beyond just hostname.) Do other people run into this? Are there problems with setting the consumer.id in order to affect the distribution of partitions? Thanks, -kevin
Re: Working DR patterns for Kafka
There was a typo in the question - should have been ... I can tolerate the [replicant]
Re: kafka mirroring ...!
I don't think you can mirror messages to a different topic name in the current mirror maker implementation. Mirror maker sends the message to destination topic based on the topic name it reads from source On Mon, Mar 9, 2015 at 5:00 PM, sunil kalva sambarc...@gmail.com wrote: Can i configure different topic name in destination cluster, i mean can i have different topic names for source and destination cluster for mirroring. If yes how can i map source topic with destination topic name ? SunilKalva On Mon, Mar 9, 2015 at 6:41 AM, tao xiao xiaotao...@gmail.com wrote: Ctrl+c is clean shutdown. kill -9 is not On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote: What does a clean shutdown of the MM entail? So far I've just been using Ctrl + C to send an interrupt to kill it. Alex On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If auto.offset.reset is set to smallest, it does not mean the consumer will always consume from the smallest. It means that if no previous offset commit is found for this consumer group, then it will consume from the smallest. So for mirror maker, you probably want to always use the same consumer group id. This could be configured in the consumer config file you pass into mirror maker. Another thing about duplicate messages is that if mirror maker is shutdown cleanly, next time when you start it again with same consumer group id, there should be no duplicates. But if mirror maker shutdown uncleanly(e.g. By a kill -9), then next time it starts up you might still have duplicate messages after the last committed offsets. Jiangjie (Becket) Qin On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote: Qin Partition problem is solved by passing --new.producer true option in command line, but adding auto.offset.rese=smallest config, every time i restart the Mirror tool it copies from starting ends up having lot of duplicate messages in destination cluster. Could you please tell me how do i configure to make sure that destination cluster is always insync with source cluster. SunilKalva On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: For data not showing up, you need to make sure mirror maker consumer auto.offset.reset is set to smallest, otherwise when you run mirror maker for the first time, all the pre-existing messages won¹t be consumed. For partition sticking, can you verify if your messages are keyed messages or not? If they are not keyed messages, can you check if you are using old producer or new producer? For old producer, the default behavior is sticking to one partition for 10 min and then move to the next partition. So if you wait for more than 10 min, you should see messages in two different partitions. Jiangjie (Becket) Qin On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote: And i also observed ,all the data is moving to one partition in destination cluster though i have multiple partitions for that topic in source and destination clusters. SunilKalva On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com wrote: I ran kafka mirroring tool after producing data in source cluster, and this is not copied to destination cluster. If i produce data after running tool those data are copied to destination cluster. Am i missing something ? -- SunilKalva -- SunilKalva -- SunilKalva -- Regards, Tao -- SunilKalva -- Regards, Tao
Re: How replicas catch up the leader
Hi, tao xiao and Jiangjie Qin I encounter with the same issue, my node had recovered from high load problem (caused by other application) this is the kafka-topic show: Topic:ad_click_sts PartitionCount:6ReplicationFactor:2 Configs: Topic: ad_click_sts Partition: 0Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 1Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 2Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 3Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 4Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 5Leader: 0 Replicas: 0,1 Isr: 0 ReplicaFetcherThread info extracted from kafka server.log : [2015-03-09 21:06:05,450] ERROR [ReplicaFetcherThread-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 7331; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [ad_click_sts,5] - PartitionFetchInfo(6149699,1048576),[ad_click_sts,3] - PartitionFetchInfo(6147835,1048576),[ad_click_sts,1] - PartitionFetchInfo(6235071,1048576) (kafka.server.ReplicaFetcherThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) …….. at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2015-03-09 21:06:05,450] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2015-03-09 21:05:57,116] INFO Partition [ad_click_sts,4] on broker 1: Cached zkVersion [556] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2015-03-09 21:06:05,772] INFO Partition [ad_click_sts,2] on broker 1: Shrinking ISR for partition [ad_click_sts,2] from 1,0 to 1 (kafka.cluster.Partition) How to fix this Isr problem ? Is there some command can be run ? Regards sy.pan
Multiple consumer groups with same group id on a single topic
Hi, I have a topic with 6 partitions. I have two consumer groups with 3 consumers each, both with the same group.id. However only one group appears to consume from the topic. Is this expected behaviour? I would expect to be able to concurrently use two consumer groups on the same topic to provide better throughput across multiple nodes (if I wanted 100 partitions for a topic for example I don't want 100 threads running on a single 4 core processor for example). It would be ideal if each consumer group decided which partitions they would read from and auto-balance between them. Currently it just waits in the background and will only consume if the other group fails. Thanks, Phill This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Does Kafka 0.8.2 producer has a lower throughput in sync-mode, comparing with 0.8.1.x?
Hi, Kafka 0.8.1.1 allows us to send a list of messages in sync mode: public void send(ListKeyedMessageK,V messages); I did not find a counter-part of this api in the new producer that is introduced in kafka 0.8.2. It seems that we can use the following method to do sync send in kafka 0.8.2: producer.send(new ProducerRecord(...))).get(); My understanding is that we can only send one message at a time in sync-mode. This will limit the throughput of kafka producer. Is there a way to send a batch of messages in sync mode using kafka 0.8.2 producer? Is there any study on the throughput of Kafka 0.8.2 producer, comparing with kafka 0.8.1.1? Thanks! Regards, Yu