[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13536705#comment-13536705 ] Joel Koshy commented on KAFKA-598: -- 33.2: not really, because that would violate the new config's semantics - i.e., each thread shouldn't exceed it's allocated amount of memory. That said, I just realized that this implementation has couple flaws and may need to be refactored or have its scope reduced. The max mem config is not always respected in this implementation. When we do the serial fetches the queue will have larger chunks (than the fair partition fetch size). However, the function that computes the fair partition fetch size assumes that the blocking queue only has chunks of the fair fetch size. I think we can take care of this, but will think about it a bit more. Another problem is that the aggregate fetch size is the fair size * num partitions assigned to the thread. So for example partition assignment happens to be very skewed and a thread happens to have only one partition; a serial refetch will be pointless since it can't use a larger fetch size. > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy >Priority: Blocker > Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, > KAFKA-598-v3.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied
[ https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-668: - Attachment: KAFKA-668-v1.patch This is a pretty straightforward change. It's slightly hacky in that I'm appending the :jmxport to the zk string, and it is effectively ignored in the Broker class. I preferred this over adding a jmxPort field to the Broker class as that would be cause wide-spread edits. > Controlled shutdown admin tool should not require controller JMX url/port to > be supplied > > > Key: KAFKA-668 > URL: https://issues.apache.org/jira/browse/KAFKA-668 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8, 0.8.1 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-668-v1.patch > > > The controlled shutdown admin command takes a zookeeper string and also > requires the user to supply the controller's jmx url/port. This is a bit > annoying since the purpose of the zookeeper string is to discover the > controller. The tool should require exactly one of these options. If > zookeeper is supplied then discover the controller and its jmx port (which > means we will need to add the jmx port information to zk). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied
[ https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-668: - Attachment: KAFKA-668-v2.patch Needed a rebase. > Controlled shutdown admin tool should not require controller JMX url/port to > be supplied > > > Key: KAFKA-668 > URL: https://issues.apache.org/jira/browse/KAFKA-668 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8, 0.8.1 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch > > > The controlled shutdown admin command takes a zookeeper string and also > requires the user to supply the controller's jmx url/port. This is a bit > annoying since the purpose of the zookeeper string is to discover the > controller. The tool should require exactly one of these options. If > zookeeper is supplied then discover the controller and its jmx port (which > means we will need to add the jmx port information to zk). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-657) Add an API to commit offsets
[ https://issues.apache.org/jira/browse/KAFKA-657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13542390#comment-13542390 ] Joel Koshy commented on KAFKA-657: -- Hey David, the patch (and the wiki) looks great. - For error handling. I think what Jun was referring to is the giant catch clause in handle - i.e., the new keys should be added as a case. That junk block of code really needs to be cleaned up :) - KafkaApis: if(offsetStr == null) : I don't think this can happen. - Default client id should probably be "" in all the request/responses i.e., to follow convention. - It would be better to use 1.toShort or val CurrentVersion: Short = 1 (instead of 1.shortValue); although it's more or less a non-issue as it's in the object. > Add an API to commit offsets > > > Key: KAFKA-657 > URL: https://issues.apache.org/jira/browse/KAFKA-657 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps > Labels: project > Attachments: KAFKA-657v1.patch, KAFKA-657v2.patch, KAFKA-657v3.patch, > KAFKA-657v4.patch, KAFKA-657v5.patch, KAFKA-657v6.patch > > > Currently the consumer directly writes their offsets to zookeeper. Two > problems with this: (1) This is a poor use of zookeeper, and we need to > replace it with a more scalable offset store, and (2) it makes it hard to > carry over to clients in other languages. A first step towards accomplishing > that is to add a proper Kafka API for committing offsets. The initial version > of this would just write to zookeeper as we do today, but in the future we > would then have the option of changing this. > This api likely needs to take a sequence of > consumer-group/topic/partition/offset entries and commit them all. > It would be good to do a wiki design on how this would work and consensus on > that first. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied
[ https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-668. -- Resolution: Fixed Thanks for the review. Committed to 0.8. > Controlled shutdown admin tool should not require controller JMX url/port to > be supplied > > > Key: KAFKA-668 > URL: https://issues.apache.org/jira/browse/KAFKA-668 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8, 0.8.1 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch > > > The controlled shutdown admin command takes a zookeeper string and also > requires the user to supply the controller's jmx url/port. This is a bit > annoying since the purpose of the zookeeper string is to discover the > controller. The tool should require exactly one of these options. If > zookeeper is supplied then discover the controller and its jmx port (which > means we will need to add the jmx port information to zk). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied
[ https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-668. > Controlled shutdown admin tool should not require controller JMX url/port to > be supplied > > > Key: KAFKA-668 > URL: https://issues.apache.org/jira/browse/KAFKA-668 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8, 0.8.1 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch > > > The controlled shutdown admin command takes a zookeeper string and also > requires the user to supply the controller's jmx url/port. This is a bit > annoying since the purpose of the zookeeper string is to discover the > controller. The tool should require exactly one of these options. If > zookeeper is supplied then discover the controller and its jmx port (which > means we will need to add the jmx port information to zk). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-681) Unclean shutdown testing - truncateAndStartWithNewOffset is not invoked when it is expected to
[ https://issues.apache.org/jira/browse/KAFKA-681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13543601#comment-13543601 ] Joel Koshy commented on KAFKA-681: -- +1 > Unclean shutdown testing - truncateAndStartWithNewOffset is not invoked when > it is expected to > -- > > Key: KAFKA-681 > URL: https://issues.apache.org/jira/browse/KAFKA-681 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: John Fung >Priority: Blocker > Labels: bugs > Attachments: kafka-681-reproduce-issue.patch, kafka-681_v1.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space
[ https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13544119#comment-13544119 ] Joel Koshy commented on KAFKA-682: -- You might need to increase your heap size. What do you have it set to right now? Would you be able to run the broker with -XX:+HeapDumpOnOutOfMemoryError to get a heap-dump? In case you are overriding defaults - what's the replication factor for the topic, num-required-acks for the producer requests, and producer request timeout? Are any requests going through or are the produce requests expiring? > java.lang.OutOfMemoryError: Java heap space > --- > > Key: KAFKA-682 > URL: https://issues.apache.org/jira/browse/KAFKA-682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 > Environment: $ uname -a > Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC > 2012 i686 i686 i686 GNU/Linux > $ java -version > java version "1.7.0_09" > OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1) > OpenJDK Server VM (build 23.2-b09, mixed mode) >Reporter: Ricky Ng-Adam > > git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856) > ...build... > ./sbt update > ./sbt package > ...run... > bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties > ...then configured fluentd with kafka plugin... > gem install fluentd --no-ri --no-rdoc > gem install fluent-plugin-kafka > fluentd -c ./fluent/fluent.conf -vv > ...then flood fluentd with messages inputted from syslog and outputted to > kafka. > results in (after about 1 messages of 1K each in 3s): > [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of > error (kafka.network.Processor) > java.lang.OutOfMemoryError: Java heap space > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45) > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Range.map(Range.scala:39) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) > at scala.collection.immutable.Range.flatMap(Range.scala:39) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:47) > at kafka.network.Processor.read(SocketServer.scala:298) > at kafka.network.Processor.run(SocketServer.scala:209) > at java.lang.Thread.run(Thread.java:722) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space
[ https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13544503#comment-13544503 ] Joel Koshy commented on KAFKA-682: -- I think that fix was merged into trunk (before 32da) so it should be there in trunk as well. > java.lang.OutOfMemoryError: Java heap space > --- > > Key: KAFKA-682 > URL: https://issues.apache.org/jira/browse/KAFKA-682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 > Environment: $ uname -a > Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC > 2012 i686 i686 i686 GNU/Linux > $ java -version > java version "1.7.0_09" > OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1) > OpenJDK Server VM (build 23.2-b09, mixed mode) >Reporter: Ricky Ng-Adam > > git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856) > ...build... > ./sbt update > ./sbt package > ...run... > bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties > ...then configured fluentd with kafka plugin... > gem install fluentd --no-ri --no-rdoc > gem install fluent-plugin-kafka > fluentd -c ./fluent/fluent.conf -vv > ...then flood fluentd with messages inputted from syslog and outputted to > kafka. > results in (after about 1 messages of 1K each in 3s): > [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of > error (kafka.network.Processor) > java.lang.OutOfMemoryError: Java heap space > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45) > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Range.map(Range.scala:39) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) > at scala.collection.immutable.Range.flatMap(Range.scala:39) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:47) > at kafka.network.Processor.read(SocketServer.scala:298) > at kafka.network.Processor.run(SocketServer.scala:209) > at java.lang.Thread.run(Thread.java:722) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space
[ https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13547129#comment-13547129 ] Joel Koshy commented on KAFKA-682: -- That's why I asked for the configured "num-required-acks for the producer requests". If it is the default (0) then it shouldn't be added to the request purgatory which rule out KAFKA-671 no? > java.lang.OutOfMemoryError: Java heap space > --- > > Key: KAFKA-682 > URL: https://issues.apache.org/jira/browse/KAFKA-682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 > Environment: $ uname -a > Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC > 2012 i686 i686 i686 GNU/Linux > $ java -version > java version "1.7.0_09" > OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1) > OpenJDK Server VM (build 23.2-b09, mixed mode) >Reporter: Ricky Ng-Adam > Attachments: java_pid22281.hprof.gz, java_pid22281_Leak_Suspects.zip > > > git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856) > ...build... > ./sbt update > ./sbt package > ...run... > bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties > ...then configured fluentd with kafka plugin... > gem install fluentd --no-ri --no-rdoc > gem install fluent-plugin-kafka > fluentd -c ./fluent/fluent.conf -vv > ...then flood fluentd with messages inputted from syslog and outputted to > kafka. > results in (after about 1 messages of 1K each in 3s): > [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of > error (kafka.network.Processor) > java.lang.OutOfMemoryError: Java heap space > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45) > at > kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Range.map(Range.scala:39) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) > at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) > at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) > at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) > at scala.collection.immutable.Range.flatMap(Range.scala:39) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:47) > at kafka.network.Processor.read(SocketServer.scala:298) > at kafka.network.Processor.run(SocketServer.scala:209) > at java.lang.Thread.run(Thread.java:722) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13551383#comment-13551383 ] Joel Koshy commented on KAFKA-598: -- The full scope should probably move out of 0.8 - i.e., as described above bounding the consumers memory is basically a packing problem without knowledge of the message-size on the broker. One possibility is for the broker to somehow communicate the size of the large message back to the client, but that would break our zero-copy property wrt fetches. So I would suggest we don't do the full patch (i.e., bounding consumer memory && handling large messages). Instead we can go with the simpler implementation that requires a new config (which is not ideal, but better IMO than trying to half-implement the above packing problem.). I haven't had time to look at this lately, but if people are okay with the above, then I can revisit one of the earlier revisions of the patches. > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy >Priority: Blocker > Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, > KAFKA-598-v3.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-702) Livelock between request handler/processor threads
Joel Koshy created KAFKA-702: Summary: Livelock between request handler/processor threads Key: KAFKA-702 URL: https://issues.apache.org/jira/browse/KAFKA-702 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Priority: Blocker Fix For: 0.8 We have seen this a couple of times in the past few days in a test cluster. The processor thread enqueues requests into the request queue and dequeues responses from the response queue. The reverse is true for the request handler thread. This leads to the following livelock situation (all the processor/request handler threads show this in the thread-dump): "kafka-processor-10251-7" prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on condition [0x7f46f698e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f48c9dd2698> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) at kafka.network.Processor.read(SocketServer.scala:321) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:619) "kafka-request-handler-7" daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 waiting on condition [0x7f46f5b8] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f48c9dd6348> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13555352#comment-13555352 ] Joel Koshy commented on KAFKA-705: -- I set up a local cluster of three brokers and created a bunch of topics, replication factor = 2. I was able to do multiple iterations of rolling bounces without issue. Since this was local, I did not use your py script as it kills pid's returned by ps. Would you by any chance be able to provide a scenario to reproduce this locally? That said, I believe John Fung also tried to reproduce this in a distributed environment but was unable to do so; so I'll probably need to take a look at logs in your environment. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13557481#comment-13557481 ] Joel Koshy commented on KAFKA-705: -- I think this is why it happens: https://github.com/apache/kafka/blob/03eb903ce223ab55c5acbcf4243ce805aaaf4fad/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L150 It could occur as follows. Suppose there's a partition 'P' assigned to brokers x and y; leaderAndIsr = y, {x, y} 1. Controlled shutdown of broker x; leaderAndIsr -> y, {y} 2. After above completes, kill -15 and then restart broker x 3. Immediately do a controlled shutdown of broker y; so now y is in the list of shutting down brokers. Due to the above, x will not start its follower to 'P' on broker y. Adding sufficient wait time between (2) and (3) seems to address the issue (in your script there's no sleep), but we should handle it properly in the shutdown code. Will think about a fix for that. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-712) Controlled shutdown tool should provide a meaningful message if a controller failover occurs during the operation
Joel Koshy created KAFKA-712: Summary: Controlled shutdown tool should provide a meaningful message if a controller failover occurs during the operation Key: KAFKA-712 URL: https://issues.apache.org/jira/browse/KAFKA-712 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Joel Koshy Priority: Minor Fix For: 0.8.1 If the controller fails over before a jmx connection can be established, the tool shows the following exception: javax.management.InstanceNotFoundException: kafka.controller:type=KafkaController,name=ControllerOps at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1094) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getClassLoaderFor(DefaultMBeanServerInterceptor.java:1438) at com.sun.jmx.mbeanserver.JmxMBeanServer.getClassLoaderFor(JmxMBeanServer.java:1276) at javax.management.remote.rmi.RMIConnectionImpl$5.run(RMIConnectionImpl.java:1326) at java.security.AccessController.doPrivileged(Native Method) at javax.management.remote.rmi.RMIConnectionImpl.getClassLoaderFor(RMIConnectionImpl.java:1323) at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:771) at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305) at sun.rmi.transport.Transport$1.run(Transport.java:159) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:155) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:255) at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:233) at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:142) at com.sun.jmx.remote.internal.PRef.invoke(Unknown Source) at javax.management.remote.rmi.RMIConnectionImpl_Stub.invoke(Unknown Source) at javax.management.remote.rmi.RMIConnector$RemoteMBeanServerConnection.invoke(RMIConnector.java:993) at kafka.admin.ShutdownBroker$.kafka$admin$ShutdownBroker$$invokeShutdown(ShutdownBroker.scala:50) at kafka.admin.ShutdownBroker$.main(ShutdownBroker.scala:105) at kafka.admin.ShutdownBroker.main(ShutdownBroker.scala) Using the retry option on the tool would work, but we should provide a more meaningful message. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-705: - Attachment: kafka-705-v1.patch Here's a simple fix. I don't really see any good reason why we shouldn't allow starting a fetcher to a broker that is shutting down but not completely shut down yet if a leader still exists on that broker. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-v1.patch, shutdown_brokers_eat.py, > shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559121#comment-13559121 ] Joel Koshy commented on KAFKA-705: -- I committed the fix to 0.8 with a small edit: used the liveOrShuttingDownBrokers field. Another small issue is that we send a stop replica fetchers to the shutting down broker even if controlled shutdown did not complete. This "prematurely" forces the broker out of the ISR of those partitions. I think it should be safe to avoid sending the stop replica request if controlled shutdown has not completely moved leadership of partitions off the shutting down broker. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-v1.patch, shutdown_brokers_eat.py, > shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-705: - Attachment: kafka-705-incremental-v2.patch Here is what I meant in my last comment. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, > shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559968#comment-13559968 ] Joel Koshy commented on KAFKA-705: -- Thanks for reviewing. I checked-in the incremental patch as well. Will leave this jira open for now until it can be verified. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, > shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-817) Implement a zookeeper path-based controlled shutdown tool
Joel Koshy created KAFKA-817: Summary: Implement a zookeeper path-based controlled shutdown tool Key: KAFKA-817 URL: https://issues.apache.org/jira/browse/KAFKA-817 Project: Kafka Issue Type: Bug Components: controller, tools Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Neha Narkhede The controlled shutdown tool currently depends on jmxremote.port being exposed. Apparently, this is often not exposed in production environments and makes the script unusable. We can move to a zk-based approach in which the controller watches a path that lists shutting down brokers. This will also make it consistent with the pattern used in some of the other replication-related tools. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-827) improve list topic output format
[ https://issues.apache.org/jira/browse/KAFKA-827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13614434#comment-13614434 ] Joel Koshy commented on KAFKA-827: -- While you are touching this, would it be reasonable to also switch from using the AdminUtil to a blank TopicMetadataRequest? It runs a lot quicker if there are a large number of topics and you run the tool from outside the ZK cluster's DC. Also, the topicOpt description has been misleading for a while. > improve list topic output format > > > Key: KAFKA-827 > URL: https://issues.apache.org/jira/browse/KAFKA-827 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao >Priority: Blocker > Attachments: kafka-827.patch > > > We need to make the output of list topic command more readable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-826) Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x
[ https://issues.apache.org/jira/browse/KAFKA-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13616783#comment-13616783 ] Joel Koshy commented on KAFKA-826: -- Thank you for looking into this. Metrics 2.x had a few minor issues with the CsvReporter (which we use in the system tests) and this is why we used 3.x. The fixes that I'm aware of are: - https://github.com/codahale/metrics/pull/225 - https://github.com/codahale/metrics/pull/290 - If a CSV file already exists, metrics throws an IOException and does not resume CSV reporting. This would be the case on a broker bounce for example. Someone put out a patch for this (https://github.com/adagios/metrics/compare/2.x-maintenance...2.x-epoch-in-csv) but I'd have to check if that was pulled into metrics-3.x Unfortunately, although the above are small fixes, if we want to use the official 2.x metrics release we would need to copy over the code of the metrics CsvReporter (i.e., into a new implementation of metrics' AbstractReporter), patch in those fixes and plug that into KafkaMetricsCsvReporter. I don't think it is difficult, but a bit clunky (which is why at the time we preferred using 3.x). > Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x > - > > Key: KAFKA-826 > URL: https://issues.apache.org/jira/browse/KAFKA-826 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Jun Rao >Priority: Blocker > Labels: build, kafka-0.8, metrics > > In order to mavenize Kafka 0.8, we have to depend on metrics 2.2.0 since > metrics 3.x is a huge change as well as not an officially supported release. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-872) Socket server does not set send/recv buffer sizes
Joel Koshy created KAFKA-872: Summary: Socket server does not set send/recv buffer sizes Key: KAFKA-872 URL: https://issues.apache.org/jira/browse/KAFKA-872 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.8 The socket server should set its send and receive socket buffer sizes - this is important in cross-DC mirroring setups where large buffer sizes are essential to enable the mirror-maker processes to do bulk consumption. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-872) Socket server does not set send/recv buffer sizes
[ https://issues.apache.org/jira/browse/KAFKA-872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-872: - Attachment: KAFKA-872-v1.patch > Socket server does not set send/recv buffer sizes > - > > Key: KAFKA-872 > URL: https://issues.apache.org/jira/browse/KAFKA-872 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Joel Koshy >Assignee: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-872-v1.patch > > > The socket server should set its send and receive socket buffer sizes - this > is important in cross-DC mirroring setups where large buffer sizes are > essential to enable the mirror-maker processes to do bulk consumption. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-872) Socket server does not set send/recv buffer sizes
[ https://issues.apache.org/jira/browse/KAFKA-872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-872. -- Resolution: Fixed Thanks for the review. Applied to 0.8. > Socket server does not set send/recv buffer sizes > - > > Key: KAFKA-872 > URL: https://issues.apache.org/jira/browse/KAFKA-872 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Joel Koshy >Assignee: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-872-v1.patch > > > The socket server should set its send and receive socket buffer sizes - this > is important in cross-DC mirroring setups where large buffer sizes are > essential to enable the mirror-maker processes to do bulk consumption. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up
Joel Koshy created KAFKA-880: Summary: NoLeaderPartitionSet should be cleared before leader finder thread is started up Key: KAFKA-880 URL: https://issues.apache.org/jira/browse/KAFKA-880 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 This was a recent regression. This could prevent the consumer from progressing because fetchers for the currently owned partitions may not be added (depending on the order that the map iterator yields). I think the fix should be simple - just clear the set after stopping the leader finder thread and stopping fetchers. {code} [2013-04-25 17:06:38,377] WARN [sometopic-somehost-1366909575615-f801367d-leader-finder-thread] , Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) (kafka.consumer.ConsumerFetcherManager$Lead erFinderThread) java.util.NoSuchElementException: key not found: [sometopic,24] at scala.collection.MapLike$class.default(MapLike.scala:223) at scala.collection.immutable.Map$Map2.default(Map.scala:110) at scala.collection.MapLike$class.apply(MapLike.scala:134) at scala.collection.immutable.Map$Map2.apply(Map.scala:110) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up
[ https://issues.apache.org/jira/browse/KAFKA-880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-880: - Description: This was a recent regression. This could prevent the consumer from progressing because fetchers for the currently owned partitions may not be added (depending on the order that the map iterator yields). I think the fix should be simple - just clear the set after stopping the leader finder thread and stopping fetchers. [2013-04-25 17:06:38,377] WARN [sometopic-somehost-1366909575615-f801367d-leader-finder-thread] , Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) (kafka.consumer.ConsumerFetcherManager$Lead erFinderThread) java.util.NoSuchElementException: key not found: [sometopic,24] at scala.collection.MapLike$class.default(MapLike.scala:223) at scala.collection.immutable.Map$Map2.default(Map.scala:110) at scala.collection.MapLike$class.apply(MapLike.scala:134) at scala.collection.immutable.Map$Map2.apply(Map.scala:110) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) was: This was a recent regression. This could prevent the consumer from progressing because fetchers for the currently owned partitions may not be added (depending on the order that the map iterator yields). I think the fix should be simple - just clear the set after stopping the leader finder thread and stopping fetchers. {code} [2013-04-25 17:06:38,377] WARN [sometopic-somehost-1366909575615-f801367d-leader-finder-thread] , Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) (kafka.consumer.ConsumerFetcherManager$Lead erFinderThread) java.util.NoSuchElementException: key not found: [sometopic,24] at scala.collection.MapLike$class.default(MapLike.scala:223) at scala.collection.immutable.Map$Map2.default(Map.scala:110) at scala.collection.MapLike$class.apply(MapLike.scala:134) at scala.collection.immutable.Map$Map2.apply(Map.scala:110) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} > NoLeaderPartitionSet should be cleared before leader finder thread is started > up > > > Key: KAFKA-880 > URL: https://issues.apache.org/jira/browse/KAFKA-880 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > > This was a recent regression. > This could prevent the consumer from progressing because fetchers for the > currently owned partitions may not be added (depending on the order that the > map iterator yields). > I think the fix should be simple - just clear the set after stopping the > leader finder thread and stopping fetchers. > [2013-04-25 17:06:38,377] WARN > [sometopic-somehost-1366909575615-f801367d-leader-finder-thread] > , Failed to find leader for Set([sometopic,11], [sometopic,25], > [sometopic,24]) (kafka.consumer.C
[jira] [Commented] (KAFKA-890) The list of brokers for fetching metadata should be shuffled
[ https://issues.apache.org/jira/browse/KAFKA-890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13645736#comment-13645736 ] Joel Koshy commented on KAFKA-890: -- +1 It is worth noting that this is useful even in the presence of a VIP since the consumers don't currently use a VIP to to look up metadata. > The list of brokers for fetching metadata should be shuffled > - > > Key: KAFKA-890 > URL: https://issues.apache.org/jira/browse/KAFKA-890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: kafka-0.8, p1 > Attachments: kafka-890.patch > > > The list of brokers in the metadata request is never shuffled. Which means > that if some clients are not using a VIP for metadata requests, the first > broker ends up servicing most metadata requests, leaving imbalanced load on > the brokers. This issue is even more pronounced when there are several > thousand clients talking to a cluster each using a broker list to fetch > metadata. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-889) Add mbeans to track socket server's response queue size in addition to request queue size
[ https://issues.apache.org/jira/browse/KAFKA-889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646080#comment-13646080 ] Joel Koshy commented on KAFKA-889: -- +1 > Add mbeans to track socket server's response queue size in addition to > request queue size > - > > Key: KAFKA-889 > URL: https://issues.apache.org/jira/browse/KAFKA-889 > Project: Kafka > Issue Type: Improvement > Components: network >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Critical > Labels: kafka-0.8 > Attachments: kafka-889.patch > > > We only track request queue size of the socket server. However, when response > send time is high, it is useful to know the response queue sizes as well -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-886) Update info on Controlled shutdown and Preferred replica election tool
[ https://issues.apache.org/jira/browse/KAFKA-886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646092#comment-13646092 ] Joel Koshy commented on KAFKA-886: -- Thanks for the great write-up. Couple of comments: 1) We should probably add a note on the controlled shutdown tool (script) usage that it is currently JMX-based and depends on the jmx.remote.port property being set (otherwise you won't be be able to use the script and will need to poke jmx through other means). We can reference KAFKA-817 which will remedy this and make it zookeeper-based instead of JMX. 2) Due to the above, in case people need to use local JMX operations and essentially do manually what the script does automatically then it is best to do a controlled shutdown and bounce of the controller last (as otherwise there would be unnecessary controller re-elections). 3) For the ListTopicCommand tool - maybe we should mention that if there are a lot of topics and we list info for all topics it can take a while to run unless it is in the same datacenter as the ZK cluster. Actually I think the ListTopicCommand should really be using the SimpleConsumer or producer to fetch metadata instead of reading ZK directly. That way, people don't have to zip up Kafka and copy it over to their production environment. What do you think? > Update info on Controlled shutdown and Preferred replica election tool > -- > > Key: KAFKA-886 > URL: https://issues.apache.org/jira/browse/KAFKA-886 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 0.8 >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian >Priority: Blocker > Labels: p1 > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-892) Change request log to include request completion not handling
[ https://issues.apache.org/jira/browse/KAFKA-892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646192#comment-13646192 ] Joel Koshy commented on KAFKA-892: -- +1 > Change request log to include request completion not handling > - > > Key: KAFKA-892 > URL: https://issues.apache.org/jira/browse/KAFKA-892 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Critical > Labels: kafka-0.8 > Attachments: kafka-892.patch > > > While troubleshooting a lot of 0.8 issues, what I've seen is that often times > than not, I've wanted the request processing latency breakdown to be part of > the request log. There are of course mbeans to expose this information, but > when you are trying to troubleshoot the root cause of few slow requests, this > is immensely helpful. Currently, we only include request reception in the > request log. We could include both but that would double the size of the > request log. So I'm proposing adding just the request completion information > in the request log. This will not just tell us which request came into the > server, but will also give us a breakdown of where it spent most of its > processing time. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13654667#comment-13654667 ] Joel Koshy commented on KAFKA-901: -- Haven't looked at the patch yet, but went through the overview. An alternate approach that we may want to consider is to maintain a metadata cache at every broker. The cache can be kept consistent by having the controller send a (new) update-metadata request to all brokers whenever it sends out a leaderAndIsr request. A new request type would avoid needing to "overload" the leader and isr request. This would help avoid the herd effect of multiple clients flooding the controller with metadata requests (although these requests should return quickly with your patch). > Kafka server can become unavailable if clients send several metadata requests > - > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Attachments: metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
Joel Koshy created KAFKA-914: Summary: Deadlock between initial rebalance and watcher-triggered rebalances Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread code is very complex so it's a bit hard to articulate the following very clearly. I will try and describe the sequence that results in a deadlock when starting up a large number of consumers at around the same time: - When a consumer's createMessageStream method is called, it initiates an initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch may trigger (due to some other consumers starting up) and initiate a rebalance. This happens successfully so fetchers start and start filling up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance attempt tries to close the fetchers. Before the fetchers are stopped, we shutdown the leader-finder-thread to prevent new fetchers from being started. - The shutdown is accomplished by interrupting the leader-finder-thread and then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to process and tries to add a fetcher for it, it will get an exception (InterruptedException if acquiring the partitionMapLock or ClosedByInterruptException if performing an offset request). If we get an InterruptedException the thread's interrupted flag is cleared. - However, the leader-finder-thread may have mu
[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663146#comment-13663146 ] Joel Koshy commented on KAFKA-914: -- One more point: [td3] above does not need to originate from a watcher-triggered rebalance. The initial rebalance can also run into the same deadlock. i.e., as long as one or more watcher-triggered rebalances succeed and start fetchers prior to the initial rebalance, we may end up in this wedged state. E.g., on another instance I saw [td3] but on the main thread: 2013-05-21_17:07:14.34308 "main" prio=10 tid=0x7f5e34008000 nid=0x4e49 waiting on condition [0x7f5e3b41] 2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking) 2013-05-21_17:07:14.34309 at sun.misc.Unsafe.park(Native Method) 2013-05-21_17:07:14.34309 - parking to wait for <0x7f5d36d99fa0> (a java.util.concurrent.CountDownLatch$Sync) 2013-05-21_17:07:14.34309 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-21_17:07:14.34311 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-21_17:07:14.34312 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-21_17:07:14.34313 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo nnector.scala:486) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala :420) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-21_17:07:14.34316 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) 2013-05-21_17:07:14.34316 - locked <0x7f5d36d4b2e0> (a java.lang.Object) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:712) 2013-05-21_17:07:14.34318 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.foreach(List.scala:45) 2013-05-21_17:07:14.34320 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.map(List.scala:45) 2013-05-21_17:07:14.34321 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) 2013-05-21_17:07:14.34322 at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > >
[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-914: - Attachment: KAFKA-914-v1.patch Patch with the mentioned fix. 1 - I added comments with some detail since the manager/fetcher/connector interaction is very tricky. 2 - Passing through throwables while shutting down. The isRunning check is probably unnecessary, but safer to keep. 3 - Made the following changes to the mirrormaker - I can put that in a separate jira as well. a - Currently if no streams are created, the mirrormaker doesn't quit. Setting streams to empty/nil fixes that issue. b - If a consumer-side exception (e.g., iterator timeout) gets thrown the mirror-maker does not exit. Addressed this by awaiting on the consumer threads at the end of the main method. > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has
[jira] [Created] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
Joel Koshy created KAFKA-916: Summary: Deadlock between fetcher shutdown and handling partitions with error Key: KAFKA-916 URL: https://issues.apache.org/jira/browse/KAFKA-916 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Here is another consumer deadlock that we encountered. All consumers are vulnerable to this during a rebalance if there happen to be partitions in error. On a rebalance, the fetcher manager closes all fetchers and this holds on to the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] While the fetcher manager is iterating over fetchers to stop them, a fetcher that is yet to be stopped hits an error on a partition and proceeds to handle partitions with error [t2]. This handling involves looking up the fetcher for that partition and then removing it from the fetcher's set of partitions to consume. This requires grabbing the same map lock in [t1], hence the deadlock. [t1] 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b waiting on condition [0x7f1b2bd38000] 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) 2013-05-22_20:23:11.95767 - parking to wait for <0x7f1a25780598> (a java.util.concurrent.CountDownLatch$Sync) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) ---> 2013-05-22_20:23:11.95771 - locked <0x7f1a2ae92510> (a java.lang.Object) 2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) 2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) 2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) 2013-05-22_20:23:11.95773 - locked <0x7f1a2a29b450> (a java.lang.Object) 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConn
[jira] [Resolved] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-914. -- Resolution: Fixed Thanks for the review. Committed after removing the unnecessary assignment in MirrorMaker. > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has a partition without leader to > > > process and tries to add a fetcher for it, it will get an exception > > > (InterruptedException if acquiring the partitionMapLock or > >
[jira] [Closed] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-914. > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has a partition without leader to > > > process and tries to add a fetcher for it, it will get an exception > > > (InterruptedException if acquiring the partitionMapLock or > > > ClosedByInterruptException if performing an offset request). If we get an >
[jira] [Commented] (KAFKA-911) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request
[ https://issues.apache.org/jira/browse/KAFKA-911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13666532#comment-13666532 ] Joel Koshy commented on KAFKA-911: -- I had to revisit the notes from KAFKA-340. I think this was touched upon. i.e., the fact that the current implementation's attempt to shrink ISR may be ineffective for partitions whose leadership has been moved from the current broker - https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478 > 3.4 What is the point of sending leader and isr request at the end of > shutdownBroker, since the OfflineReplica state > change would've taken care of that anyway. It seems like you just need to > send the stop replica request with the delete > partitions flag turned off, no ? I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache (in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is set to -1. Otherwise, we have to wait for the leader to "organically" shrink the ISR. This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader): the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well (to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR. So in fact, there is actually not much point in removing the "current leader" from the ISR in the ControlledShutdownLeaderSelector.selectLeader. and https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727 (I don't think I actually filed that jira though.) > Bug in controlled shutdown logic in controller leads to controller not > sending out some state change request > - > > Key: KAFKA-911 > URL: https://issues.apache.org/jira/browse/KAFKA-911 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: kafka-0.8, p1 > Attachments: kafka-911-v1.patch > > > The controlled shutdown logic in the controller first tries to move the > leaders from the broker being shutdown. Then it tries to remove the broker > from the isr list. During that operation, it does not synchronize on the > controllerLock. This causes a race condition while dispatching data using the > controller's channel manager. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-916: - Attachment: KAFKA-916-v1.patch Agreed - I think that should fix the issue. > Deadlock between fetcher shutdown and handling partitions with error > > > Key: KAFKA-916 > URL: https://issues.apache.org/jira/browse/KAFKA-916 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-916-v1.patch > > > Here is another consumer deadlock that we encountered. All consumers are > vulnerable to this during a rebalance if there happen to be partitions in > error. > On a rebalance, the fetcher manager closes all fetchers and this holds on to > the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] > While the fetcher manager is iterating over fetchers to stop them, a fetcher > that is yet to be stopped hits an error on a partition and proceeds to > handle partitions with error [t2]. This handling involves looking up the > fetcher for that partition and then removing it from the fetcher's set of > partitions to consume. This requires grabbing the same map lock in [t1], > hence the deadlock. > [t1] > 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b > waiting on condition [0x7f1b2bd38000] > 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95767 - parking to wait for <0x7f1a25780598> (a > java.util.concurrent.CountDownLatch$Sync) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > 2013-05-22_20:23:11.95768 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95770 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > 2013-05-22_20:23:11.95771 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > 2013-05-22_20:23:11.95771 at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) > ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a > java.lang.Object) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > 2013-05-22_20:23:11.95772 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > 2013
[jira] [Closed] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-916. Thanks for the review. Committed to 0.8 > Deadlock between fetcher shutdown and handling partitions with error > > > Key: KAFKA-916 > URL: https://issues.apache.org/jira/browse/KAFKA-916 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-916-v1.patch > > > Here is another consumer deadlock that we encountered. All consumers are > vulnerable to this during a rebalance if there happen to be partitions in > error. > On a rebalance, the fetcher manager closes all fetchers and this holds on to > the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] > While the fetcher manager is iterating over fetchers to stop them, a fetcher > that is yet to be stopped hits an error on a partition and proceeds to > handle partitions with error [t2]. This handling involves looking up the > fetcher for that partition and then removing it from the fetcher's set of > partitions to consume. This requires grabbing the same map lock in [t1], > hence the deadlock. > [t1] > 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b > waiting on condition [0x7f1b2bd38000] > 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95767 - parking to wait for <0x7f1a25780598> (a > java.util.concurrent.CountDownLatch$Sync) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > 2013-05-22_20:23:11.95768 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95770 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > 2013-05-22_20:23:11.95771 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > 2013-05-22_20:23:11.95771 at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) > ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a > java.lang.Object) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > 2013-05-22_20:23:11.95772 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > 2013-05-22_20:23:11.95773 at > scala.col
[jira] [Resolved] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-916. -- Resolution: Fixed > Deadlock between fetcher shutdown and handling partitions with error > > > Key: KAFKA-916 > URL: https://issues.apache.org/jira/browse/KAFKA-916 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-916-v1.patch > > > Here is another consumer deadlock that we encountered. All consumers are > vulnerable to this during a rebalance if there happen to be partitions in > error. > On a rebalance, the fetcher manager closes all fetchers and this holds on to > the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] > While the fetcher manager is iterating over fetchers to stop them, a fetcher > that is yet to be stopped hits an error on a partition and proceeds to > handle partitions with error [t2]. This handling involves looking up the > fetcher for that partition and then removing it from the fetcher's set of > partitions to consume. This requires grabbing the same map lock in [t1], > hence the deadlock. > [t1] > 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b > waiting on condition [0x7f1b2bd38000] > 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95767 - parking to wait for <0x7f1a25780598> (a > java.util.concurrent.CountDownLatch$Sync) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > 2013-05-22_20:23:11.95768 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95770 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > 2013-05-22_20:23:11.95771 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > 2013-05-22_20:23:11.95771 at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) > ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a > java.lang.Object) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > 2013-05-22_20:23:11.95772 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > 2013-05-22_20:23:11.95773 at > scala.collection.immutable.Range$$an
[jira] [Created] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
Joel Koshy created KAFKA-921: Summary: Expose max lag mbean for consumers and replica fetchers Key: KAFKA-921 URL: https://issues.apache.org/jira/browse/KAFKA-921 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 We have a ton of consumer mbeans with names that are derived from the consumer id, broker being fetched from, fetcher id, etc. This makes it difficult to do basic monitoring of consumer/replica fetcher lag - since the mbean to monitor can change. A more useful metric for monitoring purposes is the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-921: - Attachment: KAFKA-921-v1.patch This provides a max lag mbean for both consumer fetcher manager and replica fetcher manager; although I think it is more useful for monitoring consumers. For replica fetchers we need to closely monitor all replica fetchers anyway. i.e., the set of mbeans is static. I can reduce the scope to just consumers if others agree. > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-921: - Attachment: KAFKA-921-v2.patch Yes - I think that would be better. Moved it to AbstractFetcherManager. So depending on whether you are looking at replica fetchers or consumer fetchers, the MaxLag mbean will show up in ReplicaFetcherManager or ConsumerFetcherManager respectively. > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-921: - Attachment: KAFKA-921-v3.patch One caveat in this approach is that if a fetcher is wedged for any reason, then the reported lag is inaccurate since it depends on getting the high watermark from fetch responses. i.e., to check on the health of a consumer you would need to look at both the max lag and min fetch rate across all fetchers. > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, > KAFKA-921-v3.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-921: - Status: Patch Available (was: Open) > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, > KAFKA-921-v3.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-921. > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, > KAFKA-921-v3.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers
[ https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-921: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the reviews. Committed with the minor change - i.e., Replica instead of Replica- > Expose max lag mbean for consumers and replica fetchers > --- > > Key: KAFKA-921 > URL: https://issues.apache.org/jira/browse/KAFKA-921 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, > KAFKA-921-v3.patch > > > We have a ton of consumer mbeans with names that are derived from the > consumer id, broker being fetched from, fetcher id, etc. This makes it > difficult to do basic monitoring of consumer/replica fetcher lag - since the > mbean to monitor can change. A more useful metric for monitoring purposes is > the maximum lag across all fetchers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook
[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13674598#comment-13674598 ] Joel Koshy commented on KAFKA-927: -- +1 - sorry I got to this late. Small nit: the scaladoc for shutdown broker needs an edit which we will clean up later. We probably don't need the adminTest's testShutdownBroker given that the rolling bounce test exercises the same logic. Also, I think we can close KAFKA-817 - another approach with similar goals. > Integrate controlled shutdown into kafka shutdown hook > -- > > Key: KAFKA-927 > URL: https://issues.apache.org/jira/browse/KAFKA-927 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Fix For: 0.8 > > Attachments: KAFKA-927.patch, KAFKA-927-v2.patch, > KAFKA-927-v2-revised.patch, KAFKA-927-v3.patch, > KAFKA-927-v3-removeimports.patch, KAFKA-927-v4.patch > > > The controlled shutdown mechanism should be integrated into the software for > better operational benefits. Also few optimizations can be done to reduce > unnecessary rpc and zk calls. This patch has been tested on a prod like > environment by doing rolling bounces continuously for a day. The average time > of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes > without this patch is 340 seconds. With this patch it reduces to 220 seconds. > Also it ensures correctness in scenarios where the controller shrinks the isr > and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-938) High CPU usage when more or less idle
[ https://issues.apache.org/jira/browse/KAFKA-938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681479#comment-13681479 ] Joel Koshy commented on KAFKA-938: -- Excellent catch Sam! One comment: I think the DelayedItem class was intended to support arbitrary (non-millisecond) timunits but that was buggy in two ways: (i) The getDelay's 'unit' parameter shadowed the DelayedItem's 'unit' member (ii) The delayMs val assumes that the delay is always in ms (which prevents DelayedItem from supporting arbitrary time units). Also, I think we must have missed the bit of the DelayQueue documentation that says getDelay is called with TimeUnit.NANOSECONDS I think we can tweak this a bit to make it support arbitrary timeunits - otherwise, the "unit" parameter of DelayedItem is of no use. I can attach a patch to make this clearer. > High CPU usage when more or less idle > - > > Key: KAFKA-938 > URL: https://issues.apache.org/jira/browse/KAFKA-938 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Sam Meder >Priority: Critical > Fix For: 0.8 > > Attachments: timeunit.patch > > > We've noticed Kafka using a lot of CPU in a pretty much idle environment and > tracked it down to it's DelayedItem implementation. In particular, the time > conversion for how much longer to wait: > def getDelay(unit: TimeUnit): Long = { > val elapsedMs = (SystemTime.milliseconds - createdMs) > unit.convert(max(delayMs - elapsedMs, 0), unit) > } > does not actually convert, so Kafka ends up treating a ms value like > nanoseconds, e.g. waking up every 100 ns or so. The above code should really > be: > def getDelay(unit: TimeUnit): Long = { > val elapsedMs = (SystemTime.milliseconds - createdMs) > unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS) > } > I'll attach a patch. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-938) High CPU usage when more or less idle
[ https://issues.apache.org/jira/browse/KAFKA-938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681497#comment-13681497 ] Joel Koshy commented on KAFKA-938: -- Actually, small correction: I overlooked that the delayMs val converts the given delay from its source unit to milliseconds. So the only caveat is that precision will be lost if the desired timeunit is nanos - which we don't really need so I don't think we need any further changes here. Thanks again! > High CPU usage when more or less idle > - > > Key: KAFKA-938 > URL: https://issues.apache.org/jira/browse/KAFKA-938 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Sam Meder >Priority: Critical > Fix For: 0.8 > > Attachments: timeunit.patch > > > We've noticed Kafka using a lot of CPU in a pretty much idle environment and > tracked it down to it's DelayedItem implementation. In particular, the time > conversion for how much longer to wait: > def getDelay(unit: TimeUnit): Long = { > val elapsedMs = (SystemTime.milliseconds - createdMs) > unit.convert(max(delayMs - elapsedMs, 0), unit) > } > does not actually convert, so Kafka ends up treating a ms value like > nanoseconds, e.g. waking up every 100 ns or so. The above code should really > be: > def getDelay(unit: TimeUnit): Long = { > val elapsedMs = (SystemTime.milliseconds - createdMs) > unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS) > } > I'll attach a patch. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681787#comment-13681787 ] Joel Koshy commented on KAFKA-937: -- +1 on the patch. Additionally, can you make this small (unrelated change) - make the console consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ? I think it is worth documenting the typical path of getting into the above deadlock: - Assume at least two fetchers F1, F2 - One or more partitions on F1 go into error and leader finder thread L is notified - L unblocks and proceeds to handle partitions without leader. It holds the ConsumerFetcherManager's lock at this point. - All partitions on F2 go into error. - F2's handlePartitionsWithError removes partitions from its fetcher's partitionMap. (At this point, F2 is by definition an idle fetcher thread.) - L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2. - However, F2 at this point is trying to addPartitionsWithError which needs to acquire the ConsumerFetcherManager's lock (which is currently held by L). It is relatively rare in the sense that it can happen only if all partitions on the fetcher are in error. This could happen for example if all the leaders for those partitions move or become unavailable. Another instance where this may be seen in practice is mirroring: we ran into it when running the mirror maker with a very large number of producers and ran out of file handles. Running out of file handles could easily lead to exceptions on most/all fetches and result in an error state for all partitions. > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-940) Scala match error in javaapi.Implicits
Joel Koshy created KAFKA-940: Summary: Scala match error in javaapi.Implicits Key: KAFKA-940 URL: https://issues.apache.org/jira/browse/KAFKA-940 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 This would affect javaapi users who (correctly) test for null on API calls (e.g., if (partitionMetadata.leader == null)) Right now, we actually get a match error: scala.MatchError: null at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:38) at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:40) at kafka.javaapi.PartitionMetadata.leader(TopicMetadata.scala:51) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-940) Scala match error in javaapi.Implicits
[ https://issues.apache.org/jira/browse/KAFKA-940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-940: - Attachment: KAFKA-940-v1.patch Simple fix. > Scala match error in javaapi.Implicits > -- > > Key: KAFKA-940 > URL: https://issues.apache.org/jira/browse/KAFKA-940 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-940-v1.patch > > > This would affect javaapi users who (correctly) test for null on API calls > (e.g., if (partitionMetadata.leader == null)) > Right now, we actually get a match error: > scala.MatchError: null > at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:38) > at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:40) > at kafka.javaapi.PartitionMetadata.leader(TopicMetadata.scala:51) > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-947) isr-expiration-thread may block LeaderAndIsr request for a relatively long period
[ https://issues.apache.org/jira/browse/KAFKA-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13688719#comment-13688719 ] Joel Koshy commented on KAFKA-947: -- +1 thanks for the patch! > isr-expiration-thread may block LeaderAndIsr request for a relatively long > period > -- > > Key: KAFKA-947 > URL: https://issues.apache.org/jira/browse/KAFKA-947 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.1 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-947.patch > > > If there are lots of partitions whose isr needs to be shrank, > isr-expiration-thread will hold a long lock on leaderPartitionsLock, which > will delay LeaderAndIsr requests. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy reassigned KAFKA-559: Assignee: Tejas Patil Assigning to Tejas, since he has done some work on this recently. > Garbage collect old consumer metadata entries > - > > Key: KAFKA-559 > URL: https://issues.apache.org/jira/browse/KAFKA-559 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps >Assignee: Tejas Patil > Labels: project > > Many use cases involve tranient consumers. These consumers create entries > under their consumer group in zk and maintain offsets there as well. There is > currently no way to delete these entries. It would be good to have a tool > that did something like > bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] > --zookeeper [zk_connect] > This would scan through consumer group entries and delete any that had no > offset update since the given date. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13698307#comment-13698307 ] Joel Koshy commented on KAFKA-559: -- One additional recommendation: support a --dry-run option. > Garbage collect old consumer metadata entries > - > > Key: KAFKA-559 > URL: https://issues.apache.org/jira/browse/KAFKA-559 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps >Assignee: Tejas Patil > Labels: project > > Many use cases involve tranient consumers. These consumers create entries > under their consumer group in zk and maintain offsets there as well. There is > currently no way to delete these entries. It would be good to have a tool > that did something like > bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] > --zookeeper [zk_connect] > This would scan through consumer group entries and delete any that had no > offset update since the given date. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-958) Please compile list of key metrics on the broker and client side and put it on a wiki
[ https://issues.apache.org/jira/browse/KAFKA-958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy reassigned KAFKA-958: Assignee: Joel Koshy > Please compile list of key metrics on the broker and client side and put it > on a wiki > - > > Key: KAFKA-958 > URL: https://issues.apache.org/jira/browse/KAFKA-958 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8 >Reporter: Vadim >Assignee: Joel Koshy >Priority: Minor > > Please compile list of important metrics that need to be monitored by > companies to insure healthy operation of the kafka service -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699211#comment-13699211 ] Joel Koshy commented on KAFKA-960: -- Given that there are API changes and mbean name changes between 2.x and 3.x my preference would be to defer this to a few months later (after the official 3.x release has proven to be stable). > Upgrade Metrics to 3.x > -- > > Key: KAFKA-960 > URL: https://issues.apache.org/jira/browse/KAFKA-960 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8 >Reporter: Cosmin Lehene > Fix For: 0.8 > > > Now that metrics 3.0 has been released > (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699545#comment-13699545 ] Joel Koshy commented on KAFKA-960: -- Not really. However, my point is that given that going both directions (upgrade and downgrade) are a bit painful due to the API and mbean changes we should let 3.x prove itself to be stable in other contexts for a period of time before we switch to it. > Upgrade Metrics to 3.x > -- > > Key: KAFKA-960 > URL: https://issues.apache.org/jira/browse/KAFKA-960 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8 >Reporter: Cosmin Lehene > Fix For: 0.8 > > > Now that metrics 3.0 has been released > (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)
[ https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699567#comment-13699567 ] Joel Koshy commented on KAFKA-961: -- Passing in null for time would definitely lead to that NPE as you found. I think we only needed a time interface to support a mocktime for tests. Also, we probably didn't anticipate that KafkaServer's would need to be embedded in Java code. If you are okay with your work-around, then great. Another (ugly) way to do it would be to pass in a dynamically instantiated SystemTime - so something like (Time) Class.forName(SystemTime.class.getName()).newInstance() - not sure if it will work though. We can also provide an explicit constructor without the time argument and get rid of the scala default arg. > state.change.logger: Error on broker 1 while processing LeaderAndIsr request > correlationId 6 received from controller 1 epoch 1 for partition > (page_visits,0) > - > > Key: KAFKA-961 > URL: https://issues.apache.org/jira/browse/KAFKA-961 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 > Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr > 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Garrett Barton > > Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I > just pulled the latest from git, did a ./sbt +package, followed by ./sbt > assembly-package-dependency. And pushed > core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo. > Here is sample code ripped out to little classes to show my error: > Starting up a broker embedded in java, with the following code: > ... > Properties props = new Properties(); > // dont set so it binds to all interfaces > // props.setProperty("hostname", hostName); > props.setProperty("port", ); > props.setProperty("broker.id", "1"); > props.setProperty("log.dir", "/tmp/embeddedkafka/" + > randId); > // TODO: hardcode bad > props.setProperty("zookeeper.connect", > "localhost:2181/" + randId); > KafkaConfig kconf = new KafkaConfig(props); > > server = new KafkaServer(kconf, null); > server.startup(); > LOG.info("Broker online"); > Sample Producer has the following code: > ... > Properties props = new Properties(); > props.put("metadata.broker.list", "gman-minty:"); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("partitioner.class", > "com.gman.broker.SimplePartitioner"); > props.put("request.required.acks", "1"); > ProducerConfig config = new ProducerConfig(props); > > Producer producer = new Producer String>(config); > LOG.info("producer created"); > KeyedMessage data = new KeyedMessage String>("page_visits", "key1", "value1"); > producer.send(data); > LOG.info("wrote message: " + data); > And here is the server log: > INFO 2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: > Verifying properties > INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: > Property port is overridden to > INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: > Property broker.id is overridden to 1 > INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: > Property zookeeper.connect is overridden to localhost:2181/kafkatest > INFO 2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: > Property log.dir is overridden to \tmp\embeddedkafka\1372873650268 > INFO 2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka > Server 1], Starting > INFO 2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager > on Broker 1] Log directory > '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' > not found, creating it. > INFO 2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager > on Broker 1] Starting log cleaner every 60 ms > INFO 2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager > on Broker 1] Starting log flusher every 3000 ms with the following overrides > Map() > INFO 2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting > socket connections on 0.0.0.0:. > INFO 2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket
[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699641#comment-13699641 ] Joel Koshy commented on KAFKA-915: -- This failure is due to the fact that the leaderAndIsr request has not yet made it to the brokers until after the mirror maker's rebalance completes. This is related to the issue reported in KAFKA-956. Previously (before we started caching metadata at the brokers) the partition information was retrieved directly from zk. The fix for now would be to use the create topic admin before starting the mirror maker (or move the producer performance start up to well before the mirror maker startup). > System Test - Mirror Maker testcase_5001 failed > --- > > Key: KAFKA-915 > URL: https://issues.apache.org/jira/browse/KAFKA-915 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: Joel Koshy >Priority: Critical > Labels: kafka-0.8, replication-testing > Attachments: testcase_5001_debug_logs.tar.gz > > > This case passes if brokers are set to partition = 1, replicas = 1 > It fails if brokers are set to partition = 5, replicas = 3 (consistently > reproducible) > This test case is set up as shown below. > 1. Start 2 ZK as a cluster in Source > 2. Start 2 ZK as a cluster in Target > 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) > 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) > 5. Start 1 MM > 6. Start ProducerPerformance to send some data > 7. After Producer is done, start ConsoleConsumer to consume data > 8. Stop all processes and validate if there is any data loss. > 9. No failure is introduced to any process in this test > Attached a tar file which contains the logs and system test output for both > cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703007#comment-13703007 ] Joel Koshy commented on KAFKA-559: -- Thanks for the patch. Overall, looks good. Couple of comments, mostly minor in no particular order: * I think dry-run does not need any further qualifier such as withOptionalArg, describedAs, ofType - it's just a flag. * For --since, I prefer the seconds since epoch over some fixed input format which then brings in ambiguity such as timezone 24h vs 12h, etc. A better alternative would be to accept date strings and use the DateFormat class with lenient parsing turned on or something like that. --before may be more intuitive than --since. * Can use CommandLineUtils.checkRequiredArgs * deleteBy matching - prefer to use case match and thereby avoid the explicit check on valid values. Also the message on invalid value of deleteBy should inform what the valid values are. * Right now you support the following modes: delete stale topics across all groups, delete stale topics in a specific group. I think it would be useful to make deleteBy optional - if unspecified, it scans all groups and gets rid of stale groups. * line 75: warn ("msg", e) * line 101: should provide a reason for aborting * line 110: doesn't gropudirs have an offset path? if not maybe we should add it * Logging should include last mtime as that may be useful information reported by the dry-run * No need to add a wrapper shell script for the tool. * make all of the methods except main private. * The return statements can be dropped - i.e., just write the return value. * Several vars can be vals instead. * removeBrokerPartitionpairs: I don't think you would want to do a partial delete under a topic directory. You can check that all the partition offset paths are <= since and if so, just delete the topic path. With that the method would be better named something like deleteUnconsumedTopicsFromGroup? * Finally, you are probably aware that there are a bunch of race conditions - e.g., checkIfLiveConsumers is a helpful check to have but not guaranteed to be correct as some consumers may creep in while the tool is running. However, I think it is reasonable for a tool like this to ignore that since a "since" value way back would mean the probability of that occuring is very low. Similar note for deleteGroupIfNoTopicExists. > Garbage collect old consumer metadata entries > - > > Key: KAFKA-559 > URL: https://issues.apache.org/jira/browse/KAFKA-559 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps >Assignee: Tejas Patil > Labels: project > Attachments: KAFKA-559.v1.patch, KAFKA-559.v2.patch > > > Many use cases involve tranient consumers. These consumers create entries > under their consumer group in zk and maintain offsets there as well. There is > currently no way to delete these entries. It would be good to have a tool > that did something like > bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] > --zookeeper [zk_connect] > This would scan through consumer group entries and delete any that had no > offset update since the given date. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message
[ https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703755#comment-13703755 ] Joel Koshy commented on KAFKA-966: -- One way to accomplish this is to turn off autocommit and checkpoint offsets only after a message (or batch of messages) have been written to the DB. One caveat though is that rebalances (e.g., if a new consumer instance shows up) will result in offsets being committed so there would be an issue if the DB is unavailable and a rebalance occurs simultaneously and there are unprocessed messages that have already been pulled out of the iterator. > Allow high level consumer to 'nak' a message and force Kafka to close the > KafkaStream without losing that message > - > > Key: KAFKA-966 > URL: https://issues.apache.org/jira/browse/KAFKA-966 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.8 >Reporter: Chris Curtin >Assignee: Neha Narkhede >Priority: Minor > > Enhancement request. > The high level consumer is very close to handling a lot of situations a > 'typical' client would need. Except for when the message received from Kafka > is valid, but the business logic that wants to consume it has a problem. > For example if I want to write the value to a MongoDB or Cassandra database > and the database is not available. I won't know until I go to do the write > that the database isn't available, but by then it is too late to NOT read the > message from Kafka. Thus if I call shutdown() to stop reading, that message > is lost since the offset Kafka writes to ZooKeeper is the next offset. > Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the > next offset to read for this partition to this message when I start up again. > And if there are any messages in the BlockingQueue for other partitions, find > the lowest # and use it for that partitions offset since I haven't consumed > them yet. > Thus I can cleanly shutdown my processing, resolve whatever the issue is and > restart the process. > Another idea might be to allow a 'peek' into the next message and if I > succeed in writing to the database call 'next' to remove it from the queue. > I understand this won't deal with a 'kill -9' or hard failure of the JVM > leading to the latest offsets not being written to ZooKeeper but it addresses > a likely common scenario for consumers. Nor will it add true transactional > support since the ZK update could fail. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message
[ https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705272#comment-13705272 ] Joel Koshy commented on KAFKA-966: -- Yes if you need to implement support for transactions across partitions that are potentially owned by different consumer instances then this approach wouldn't work. Not sure if it is feasible in your case but if there are a group of messages that need to be committed together then you could send them with a key and partition those messages into the same partition. So exactly one consumer thread will be responsible for those messages. > Allow high level consumer to 'nak' a message and force Kafka to close the > KafkaStream without losing that message > - > > Key: KAFKA-966 > URL: https://issues.apache.org/jira/browse/KAFKA-966 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.8 >Reporter: Chris Curtin >Assignee: Neha Narkhede >Priority: Minor > > Enhancement request. > The high level consumer is very close to handling a lot of situations a > 'typical' client would need. Except for when the message received from Kafka > is valid, but the business logic that wants to consume it has a problem. > For example if I want to write the value to a MongoDB or Cassandra database > and the database is not available. I won't know until I go to do the write > that the database isn't available, but by then it is too late to NOT read the > message from Kafka. Thus if I call shutdown() to stop reading, that message > is lost since the offset Kafka writes to ZooKeeper is the next offset. > Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the > next offset to read for this partition to this message when I start up again. > And if there are any messages in the BlockingQueue for other partitions, find > the lowest # and use it for that partitions offset since I haven't consumed > them yet. > Thus I can cleanly shutdown my processing, resolve whatever the issue is and > restart the process. > Another idea might be to allow a 'peek' into the next message and if I > succeed in writing to the database call 'next' to remove it from the queue. > I understand this won't deal with a 'kill -9' or hard failure of the JVM > leading to the latest offsets not being written to ZooKeeper but it addresses > a likely common scenario for consumers. Nor will it add true transactional > support since the ZK update could fail. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up
[ https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705340#comment-13705340 ] Joel Koshy commented on KAFKA-969: -- This seems reasonable, but I'm not fully convinced about it. E.g., a test framework should ensure external dependencies are up before attempting to make service calls to those dependencies. That said, it is perhaps also reasonable from a consumer's perspective to expect that returned streams be empty at first, and whenever brokers and topics show up, then events should just show up. I'm +1 on this patch except for the if-else formatting issue. Also, I think this patch alone would be insufficient to meet the above. There are two other issues: - We should register a watcher under the topics path (currently done only if a wildcard is specified) - KAFKA-956 is also related. I need to give that one some thought. > Need to prevent failure of rebalance when there are no brokers available when > consumer comes up > --- > > Key: KAFKA-969 > URL: https://issues.apache.org/jira/browse/KAFKA-969 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Attachments: emptybrokeronrebalance-1.patch > > > There are some rare instances when a consumer would be up before bringing up > the Kafka brokers. This would usually happen in a test scenario. In such > conditions, during rebalance instead of failing the rebalance we just log the > error and subscribe to broker changes. When the broker comes back up, we > trigger the rebalance. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up
[ https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705490#comment-13705490 ] Joel Koshy commented on KAFKA-969: -- As I already said, I'm +1 on this patch for what it intends to address. Those two issues I mentioned are orthogonal. By "above" in my comment I was referring to the possible expectation from consumers: ".. from a consumer's perspective to expect that returned streams be empty at first, and whenever brokers and topics show up, then events should just show up." - not the "failed to rebalance issue". > Need to prevent failure of rebalance when there are no brokers available when > consumer comes up > --- > > Key: KAFKA-969 > URL: https://issues.apache.org/jira/browse/KAFKA-969 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Fix For: 0.8 > > Attachments: emptybrokeronrebalance-1.patch > > > There are some rare instances when a consumer would be up before bringing up > the Kafka brokers. This would usually happen in a test scenario. In such > conditions, during rebalance instead of failing the rebalance we just log the > error and subscribe to broker changes. When the broker comes back up, we > trigger the rebalance. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-705. -- Resolution: Fixed > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, > shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709525#comment-13709525 ] Joel Koshy commented on KAFKA-705: -- Yes we can close this. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, > shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-705. > Controlled shutdown doesn't seem to work on more than one broker in a cluster > - > > Key: KAFKA-705 > URL: https://issues.apache.org/jira/browse/KAFKA-705 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Joel Koshy >Priority: Critical > Labels: bugs > Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, > shutdown_brokers_eat.py, shutdown-command > > > I wrote a script (attached here) to basically round robin through the brokers > in a cluster doing the following 2 operations on each of them - > 1. Send the controlled shutdown admin command. If it succeeds > 2. Restart the broker > What I've observed is that only one broker is able to finish the above > successfully the first time around. For the rest of the iterations, no broker > is able to shutdown using the admin command and every single time it fails > with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-973) Messages From Producer Not being Partitioned
[ https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709531#comment-13709531 ] Joel Koshy commented on KAFKA-973: -- Can you try sending more messages? The default partitioner is random so all the partitions should get messages (as long as you send enough messages - three messages ending up on one partition can happen). > Messages From Producer Not being Partitioned > - > > Key: KAFKA-973 > URL: https://issues.apache.org/jira/browse/KAFKA-973 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 > Environment: Linux >Reporter: Subbu Srinivasan >Assignee: Neha Narkhede > Labels: newbie > > I created a two node cluster. > 2 zoo keepers > 2 brokers > 1 topic with replication factor (2) and no of partition 2. > my consumer group has two threads > 1) From my Java client - I send few messages to the topic. I have set > multiple brokers > kafka2:9092,kafka1:9092. > Only one thread in my consumer always gets the messages. It looks like > producer is not > partitioning the requests properly. > 2) However if I send some sample using the simple console producer, I see > multiple threads getting > requests and is load balanced. > What am I doing wrong in my client? > public class KafkaProducer { > > private final Properties props = new Properties(); > private static AtomicLong counter = new AtomicLong(0); > kafka.javaapi.producer.Producer producer = null; > > public KafkaProducer() > { > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("metadata.broker.list", > ConfigurationUtility.getKafkaHost()); > producer = new kafka.javaapi.producer.Producer(new > ProducerConfig(props)); > } > > public void sendMessage(String msg) throws Exception > { > producer.send(new KeyedMessage String>(ConfigurationUtility.getTopicName(), msg)); > } > > > public static void main(String arg[]) throws Exception > { > > ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092"); > ConfigurationUtility.setTopicName("dnslog"); > > ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181"); > ConfigurationUtility.setConsumerGroupId("dnslog"); > > for(int i = 0 ; i < 2 ; ++i) > { > (new > KafkaProducer()).sendMessage(UUID.randomUUID().toString()); > } > } > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster
[ https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13710571#comment-13710571 ] Joel Koshy commented on KAFKA-957: -- Thanks for incorporating 5 and 6. Couple additional comments: - For the two match statements you have it is probably sufficient and clearer to just use if (key == null) and if (props.contains(..)) - I'm not so sure if the trace is required but it could be useful. Would prefer the following format: "Sending message with key " - no need to show the payload. Also, may want to use java.util.Arrays.toString on the byte array. - Per Jay's offline comments, hashCode in general is a bit unsafe to "rely". For e.g., it could be a non-uniform distribution or the underlying function could change. That said, your usage is safe. Still, it should be straightforward to do a custom hash function that we can rely on for consistency. > MirrorMaker needs to preserve the key in the source cluster > --- > > Key: KAFKA-957 > URL: https://issues.apache.org/jira/browse/KAFKA-957 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Guozhang Wang > Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, > KAFKA-957.v2.patch > > > Currently, MirrorMaker only propagates the message to the target cluster, but > not the associated key. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-967) Use key range in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13710589#comment-13710589 ] Joel Koshy commented on KAFKA-967: -- +1 - thanks for the patch. > Use key range in ProducerPerformance > > > Key: KAFKA-967 > URL: https://issues.apache.org/jira/browse/KAFKA-967 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Attachments: KAFKA-967.v1.patch, KAFKA-967.v2.patch > > > Currently in ProducerPerformance, the key of the message is set to MessageID. > It would better to set it to a specific key within a key range (Integer type) > so that we can test the semantic partitioning case. This is related to > KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-347) change number of partitions of a topic online
[ https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13713045#comment-13713045 ] Joel Koshy commented on KAFKA-347: -- Thank you for the patch. Couple of comments, all very minor: AddPartitionsCommand: - IMO it is more intuitive for the option to be: "total partitions desired" as opposed to "num partitions to add" - It is a bit odd that we can allow some partitions with a different replication factor from what's already there. I don't see any issues with it though. I just think it's a bit odd. One potential issue is if producers explicitly want to set acks to say 3 when there are some partitions with a replication factor of 2 and some with 3 (However, producers really should be using -1 in which case it would be fine). - I think the command can currently allow an unintentional reassignment of replicas since the persistent path is always updated. (or no?) I think this can be easily checked for and avoided. - Apart from start partition id I think getManualReplicaAssignment is identical to CreateTopicCommand's - maybe that code can be moved into AdminUtils? KafkaController: - nitpick: ZkUtils.getAllTopics(zkClient).foreach(p => partitionStateMachine.registerPartitionChangeListener(p)) (can you change p to t :) - p really looks like a partition but it is a topic ) AdminUtils: - the //"for testing only" comment is now misplaced. - This code is pre-existing, but would prefer changing secondReplicaShift to nextReplicaShift. - Any reason why AddPartitionsTest should not be within AdminTest? - Finally, can you rebase? Sorry for not getting to this patch sooner :( > change number of partitions of a topic online > - > > Key: KAFKA-347 > URL: https://issues.apache.org/jira/browse/KAFKA-347 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Sriram Subramanian > Labels: features > Fix For: 0.8.1 > > Attachments: kafka-347.patch > > > We will need an admin tool to change the number of partitions of a topic > online. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-347) change number of partitions of a topic online
[ https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13714259#comment-13714259 ] Joel Koshy commented on KAFKA-347: -- Thanks for patch v2. I'm +1 on this as is, but if you can address some of these minor comments that would be great. v2.1 - For "num partitions to add" vs "partitions desired" - all I meant was that most of the time users would think of "desired number of partitions" vs "how many more to add". E.g., I have eight partitions for a topic, I now want 20 instead. It is more convenient to just say I want "20" partitions instead of thinking of how many to add. OTOH since we don't support reducing partitions treating it as a "num partitions to add" is safer. So I don't feel very strongly about it either way. v2.2 - Re: unintentional reassignment of partitions. Yes you are right. v2.3 - Your patch still has ZookeeperConsumerConnector changes in it, so it did not apply cleanly. v2.4 - On checking the replication factor: if we don't allow having a different replication factor for the new partitions we should not even expose it as an option. v2.5 - AddPartitionsListener: no need to change it now, but just a comment: we can directly parse the replica assignment from the data object (instead of reading from zookeeper again) right? v2.6 - On moving getManualReplicaAssignment to AdminUtils - I think it would be good to do that here, but either way is fine. > change number of partitions of a topic online > - > > Key: KAFKA-347 > URL: https://issues.apache.org/jira/browse/KAFKA-347 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Sriram Subramanian > Labels: features > Fix For: 0.8.1 > > Attachments: kafka-347.patch, kafka-347-v2.patch > > > We will need an admin tool to change the number of partitions of a topic > online. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-982) Logo for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13716285#comment-13716285 ] Joel Koshy commented on KAFKA-982: -- +1 for 298, and I like Jakob's recursive suggestion as well (can you add feedback on that on the 99designs contest?). 294 seems interesting/deep (pen, two k's, I kind of see a person with hands raised, etc.), but I just prefer 298 wrt overall appearance. 296 is also good, but between 296 and 298 I prefer 298. > Logo for Kafka > -- > > Key: KAFKA-982 > URL: https://issues.apache.org/jira/browse/KAFKA-982 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps > Attachments: 289.jpeg, 294.jpeg, 296.png, 298.jpeg > > > We should have a logo for kafka. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-925) Add optional partition key override in producer
[ https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13716593#comment-13716593 ] Joel Koshy commented on KAFKA-925: -- +1 , looks good to me. DefaultPartitioner: Do we need the type parameter anymore? Guozhang has a good point about tools such as mirror maker not having access to the original partitioning key. However, I can see that it would be clunky as we would then need a partition key serializer as well. Also, for something like offset-preserving mirrors we would anyway have the source cluster's partition available, so I don't see it as a major issue. ConsoleProducer: the enqueue timeout change seems reasonable - I'm assuming it was done to avoid dropping messages when piping into ConsoleProducer. Correct? > Add optional partition key override in producer > --- > > Key: KAFKA-925 > URL: https://issues.apache.org/jira/browse/KAFKA-925 > Project: Kafka > Issue Type: New Feature > Components: producer >Affects Versions: 0.8.1 >Reporter: Jay Kreps >Assignee: Jay Kreps > Attachments: KAFKA-925-v1.patch, KAFKA-925-v2.patch > > > We have a key that is used for partitioning in the producer and stored with > the message. Actually these uses, though often the same, could be different. > The two meanings are effectively: > 1. Assignment to a partition > 2. Deduplication within a partition > In cases where we want to allow the client to take advantage of both of these > and they aren't the same it would be nice to allow them to be specified > separately. > To implement this I added an optional partition key to KeyedMessage. When > specified this key is used for partitioning rather than the message key. This > key is of type Any and the parametric typing is removed from the partitioner > to allow it to work with either key. > An alternative would be to allow the partition id to specified in the > KeyedMessage. This would be slightly more convenient in the case where there > is no partition key but instead you know a priori the partition number--this > case must be handled by giving the partition id as the partition key and > using an identity partitioner which is slightly more roundabout. However this > is inconsistent with the normal partitioning which requires a key in the case > where the partition is determined by a key--in that case you would be > manually calling your partitioner in user code. It seems best to me to either > use a key or always a partition and since we currently take a key I stuck > with that. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster
[ https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-957: - Resolution: Fixed Status: Resolved (was: Patch Available) Committed to 0.8 > MirrorMaker needs to preserve the key in the source cluster > --- > > Key: KAFKA-957 > URL: https://issues.apache.org/jira/browse/KAFKA-957 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Guozhang Wang > Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, > KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch > > > Currently, MirrorMaker only propagates the message to the target cluster, but > not the associated key. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster
[ https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-957. > MirrorMaker needs to preserve the key in the source cluster > --- > > Key: KAFKA-957 > URL: https://issues.apache.org/jira/browse/KAFKA-957 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Guozhang Wang > Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, > KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch > > > Currently, MirrorMaker only propagates the message to the target cluster, but > not the associated key. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster
[ https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13718690#comment-13718690 ] Joel Koshy commented on KAFKA-957: -- +1 > MirrorMaker needs to preserve the key in the source cluster > --- > > Key: KAFKA-957 > URL: https://issues.apache.org/jira/browse/KAFKA-957 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Guozhang Wang > Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, > KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch > > > Currently, MirrorMaker only propagates the message to the target cluster, but > not the associated key. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level
[ https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13719121#comment-13719121 ] Joel Koshy commented on KAFKA-983: -- +1 - can you rebase? Also, may be better to have an if null check in the shutdown statements. > Expose cleanshutdown method in MirrorMaker at the object level > -- > > Key: KAFKA-983 > URL: https://issues.apache.org/jira/browse/KAFKA-983 > Project: Kafka > Issue Type: Bug >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Attachments: KAFKA-983.patch > > > Making clean shutdown in MirrorMaker public at the object level will be > useful. Currently if MirrorMaker is run in a container process, the only way > to stop it seems to be triggering the shutdown hook (System.exit(0)) which > may have unwarranted side effects on the other threads running in that > container process. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level
[ https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-983: - Resolution: Fixed Status: Resolved (was: Patch Available) > Expose cleanshutdown method in MirrorMaker at the object level > -- > > Key: KAFKA-983 > URL: https://issues.apache.org/jira/browse/KAFKA-983 > Project: Kafka > Issue Type: Bug >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Attachments: KAFKA-983.patch, KAFKA-983-rebased.patch > > > Making clean shutdown in MirrorMaker public at the object level will be > useful. Currently if MirrorMaker is run in a container process, the only way > to stop it seems to be triggering the shutdown hook (System.exit(0)) which > may have unwarranted side effects on the other threads running in that > container process. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level
[ https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-983. Thanks for the patch. Committed to 0.8 > Expose cleanshutdown method in MirrorMaker at the object level > -- > > Key: KAFKA-983 > URL: https://issues.apache.org/jira/browse/KAFKA-983 > Project: Kafka > Issue Type: Bug >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Attachments: KAFKA-983.patch, KAFKA-983-rebased.patch > > > Making clean shutdown in MirrorMaker public at the object level will be > useful. Currently if MirrorMaker is run in a container process, the only way > to stop it seems to be triggering the shutdown hook (System.exit(0)) which > may have unwarranted side effects on the other threads running in that > container process. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13726958#comment-13726958 ] Joel Koshy commented on KAFKA-991: -- +1 Committed to 0.8 Minor comment: - queue size is unintuitive. sounds like number of messages, but it is bytes - The totalSize > queueSize check should ideally be done before adding it to msgList. > Reduce the queue size in hadoop producer > > > Key: KAFKA-991 > URL: https://issues.apache.org/jira/browse/KAFKA-991 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-991-v1.patch > > > Currently the queue.size in hadoop producer is 10MB. This means that the > KafkaRecordWriter will hit the send button on kafka producer after the size > of uncompressed queued messages becomes greater than 10MB. (The other > condition on which the messages are sent is if their number exceeds > SHORT.MAX_VALUE). > Considering that the server accepts a (compressed) batch of messages of > sizeupto 1 million bytes minus the log overhead, we should probably reduce > the queue size in hadoop producer. We should do two things: > 1. change max message size on the broker to 1 million + log overhead, because > that will make the client message size easy to remember. Right now the > maximum number of bytes that can be accepted from a client in a batch of > messages is an awkward 88. (I don't have a stronger reason). We have set > fetch size on the consumer to 1MB, this gives us a lot of room even if the > log overhead increased with further versions. > 2. Set the default number of bytes on hadoop producer to 1 million bytes. > Anyone who wants higher throughput can override this config using > kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727092#comment-13727092 ] Joel Koshy commented on KAFKA-991: -- Thanks for the follow-up patch. totalBytes is set to zero in sendMsgList so the next batch totalBytes will less (incorrect) by valBytes. > Reduce the queue size in hadoop producer > > > Key: KAFKA-991 > URL: https://issues.apache.org/jira/browse/KAFKA-991 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-991-followup.patch, kafka-991-followup-v2.patch, > kafka-991-v1.patch > > > Currently the queue.size in hadoop producer is 10MB. This means that the > KafkaRecordWriter will hit the send button on kafka producer after the size > of uncompressed queued messages becomes greater than 10MB. (The other > condition on which the messages are sent is if their number exceeds > SHORT.MAX_VALUE). > Considering that the server accepts a (compressed) batch of messages of > sizeupto 1 million bytes minus the log overhead, we should probably reduce > the queue size in hadoop producer. We should do two things: > 1. change max message size on the broker to 1 million + log overhead, because > that will make the client message size easy to remember. Right now the > maximum number of bytes that can be accepted from a client in a batch of > messages is an awkward 88. (I don't have a stronger reason). We have set > fetch size on the consumer to 1MB, this gives us a lot of room even if the > log overhead increased with further versions. > 2. Set the default number of bytes on hadoop producer to 1 million bytes. > Anyone who wants higher throughput can override this config using > kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-991: - Resolution: Fixed Status: Resolved (was: Patch Available) > Reduce the queue size in hadoop producer > > > Key: KAFKA-991 > URL: https://issues.apache.org/jira/browse/KAFKA-991 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, > kafka-991-followup-v2.patch, kafka-991-v1.patch > > > Currently the queue.size in hadoop producer is 10MB. This means that the > KafkaRecordWriter will hit the send button on kafka producer after the size > of uncompressed queued messages becomes greater than 10MB. (The other > condition on which the messages are sent is if their number exceeds > SHORT.MAX_VALUE). > Considering that the server accepts a (compressed) batch of messages of > sizeupto 1 million bytes minus the log overhead, we should probably reduce > the queue size in hadoop producer. We should do two things: > 1. change max message size on the broker to 1 million + log overhead, because > that will make the client message size easy to remember. Right now the > maximum number of bytes that can be accepted from a client in a batch of > messages is an awkward 88. (I don't have a stronger reason). We have set > fetch size on the consumer to 1MB, this gives us a lot of room even if the > log overhead increased with further versions. > 2. Set the default number of bytes on hadoop producer to 1 million bytes. > Anyone who wants higher throughput can override this config using > kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-991. +1 Committed to 0.8 > Reduce the queue size in hadoop producer > > > Key: KAFKA-991 > URL: https://issues.apache.org/jira/browse/KAFKA-991 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, > kafka-991-followup-v2.patch, kafka-991-v1.patch > > > Currently the queue.size in hadoop producer is 10MB. This means that the > KafkaRecordWriter will hit the send button on kafka producer after the size > of uncompressed queued messages becomes greater than 10MB. (The other > condition on which the messages are sent is if their number exceeds > SHORT.MAX_VALUE). > Considering that the server accepts a (compressed) batch of messages of > sizeupto 1 million bytes minus the log overhead, we should probably reduce > the queue size in hadoop producer. We should do two things: > 1. change max message size on the broker to 1 million + log overhead, because > that will make the client message size easy to remember. Right now the > maximum number of bytes that can be accepted from a client in a batch of > messages is an awkward 88. (I don't have a stronger reason). We have set > fetch size on the consumer to 1MB, this gives us a lot of room even if the > log overhead increased with further versions. > 2. Set the default number of bytes on hadoop producer to 1 million bytes. > Anyone who wants higher throughput can override this config using > kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-984) Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same
[ https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728209#comment-13728209 ] Joel Koshy commented on KAFKA-984: -- Thanks for the patch - this will help a *lot* especially for mirroring. However, I share Jun's concern about making such a non-trivial change to 0.8. In any event, here are some comments on scala.kafka.consumer.ZookeeperConsumerConnector - We should definitely abstract out the common code - syncedPartialRebalance and WildcardStreamsHandler. I think with some thought we can refactor it or we end up with copies of relatively complex code. - The filters on lines 432/433 will not have any effect (I think) since the maps are immutable. You should probably apply the filter on assignments on lines 428/429. So metadata for other topics will be fetched unnecessarily, and fetchers for other topics may be stopped unnecessarily. - Also, there are topic variables inside the method that shadow the parameter which makes it harder to determine which variable is in effect for which scope. - Logging can be improved/made more concise: few typos and inconsistencies in capitalization. - Why do this only if # added topics == 1? Can accept a list of topics to rebalance for instead right? I do see your note on Sriram's comments, but I don't see it in this jira. Can you include those comments? > Avoid a full rebalance in cases when a new topic is discovered but > container/broker set stay the same > - > > Key: KAFKA-984 > URL: https://issues.apache.org/jira/browse/KAFKA-984 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.8 > > Attachments: KAFKA-984.v1.patch, KAFKA-984.v2.patch, > KAFKA-984.v2.patch > > > Currently a full rebalance will be triggered on high level consumers even > when just a new topic is added to ZK. Better avoid this behavior but only > rebalance on this newly added topic. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728247#comment-13728247 ] Joel Koshy commented on KAFKA-915: -- +1 on the patch. I actually could not reproduce the other failures, so I'll check this in. _test_case_name : testcase_5001 _test_class_name : MirrorMakerTest arg : bounce_leader : false arg : bounce_mirror_maker : false arg : message_producing_free_time_sec : 15 arg : num_iteration : 1 arg : num_messages_to_produce_per_producer_call : 50 arg : num_partition : 1 arg : replica_factor : 3 arg : sleep_seconds_between_producer_calls : 1 validation_status : Unique messages from consumer on [test_1] : 500 Unique messages from producer on [test_1] : 500 Validate for data matched on topic [test_1] : PASSED Validate for merged log segment checksum in cluster [source] : PASSED Validate for merged log segment checksum in cluster [target] : PASSED _test_case_name : testcase_5002 _test_class_name : MirrorMakerTest validation_status : _test_case_name : testcase_5003 _test_class_name : MirrorMakerTest arg : bounce_leader : false arg : bounce_mirror_maker : true arg : bounced_entity_downtime_sec : 30 arg : message_producing_free_time_sec : 15 arg : num_iteration : 1 arg : num_messages_to_produce_per_producer_call : 50 arg : num_partition : 1 arg : replica_factor : 3 arg : sleep_seconds_between_producer_calls : 1 validation_status : Unique messages from consumer on [test_1] : 2200 Unique messages from producer on [test_1] : 2200 Validate for data matched on topic [test_1] : PASSED Validate for merged log segment checksum in cluster [source] : PASSED Validate for merged log segment checksum in cluster [target] : PASSED _test_case_name : testcase_5004 _test_class_name : MirrorMakerTest validation_status : _test_case_name : testcase_5005 _test_class_name : MirrorMakerTest arg : bounce_leader : false arg : bounce_mirror_maker : true arg : bounced_entity_downtime_sec : 30 arg : message_producing_free_time_sec : 15 arg : num_iteration : 1 arg : num_messages_to_produce_per_producer_call : 50 arg : num_partition : 2 arg : replica_factor : 3 arg : sleep_seconds_between_producer_calls : 1 validation_status : Unique messages from consumer on [test_1] : 1400 Unique messages from consumer on [test_2] : 1400 Unique messages from producer on [test_1] : 1400 Unique messages from producer on [test_2] : 1400 Validate for data matched on topic [test_1] : PASSED Validate for data matched on topic [test_2] : PASSED Validate for merged log segment checksum in cluster [source] : PASSED Validate for merged log segment checksum in cluster [target] : PASSED > System Test - Mirror Maker testcase_5001 failed > --- > > Key: KAFKA-915 > URL: https://issues.apache.org/jira/browse/KAFKA-915 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: Joel Koshy >Priority: Critical > Labels: kafka-0.8, replication-testing > Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz > > > This case passes if brokers are set to partition = 1, replicas = 1 > It fails if brokers are set to partition = 5, replicas = 3 (consistently > reproducible) > This test case is set up as shown below. > 1. Start 2 ZK as a cluster in Source > 2. Start 2 ZK as a cluster in Target > 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) > 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) > 5. Start 1 MM > 6. Start ProducerPerformance to send some data > 7. After Producer is done, start ConsoleConsumer to consume data > 8. Stop all processes and validate if there is any data loss. > 9. No failure is introduced to any process in this test > Attached a tar file which contains the logs and system test output for both > cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-915. > System Test - Mirror Maker testcase_5001 failed > --- > > Key: KAFKA-915 > URL: https://issues.apache.org/jira/browse/KAFKA-915 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: Joel Koshy >Priority: Critical > Labels: kafka-0.8, replication-testing > Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz > > > This case passes if brokers are set to partition = 1, replicas = 1 > It fails if brokers are set to partition = 5, replicas = 3 (consistently > reproducible) > This test case is set up as shown below. > 1. Start 2 ZK as a cluster in Source > 2. Start 2 ZK as a cluster in Target > 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) > 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) > 5. Start 1 MM > 6. Start ProducerPerformance to send some data > 7. After Producer is done, start ConsoleConsumer to consume data > 8. Stop all processes and validate if there is any data loss. > 9. No failure is introduced to any process in this test > Attached a tar file which contains the logs and system test output for both > cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-915. -- Resolution: Fixed > System Test - Mirror Maker testcase_5001 failed > --- > > Key: KAFKA-915 > URL: https://issues.apache.org/jira/browse/KAFKA-915 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: Joel Koshy >Priority: Critical > Labels: kafka-0.8, replication-testing > Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz > > > This case passes if brokers are set to partition = 1, replicas = 1 > It fails if brokers are set to partition = 5, replicas = 3 (consistently > reproducible) > This test case is set up as shown below. > 1. Start 2 ZK as a cluster in Source > 2. Start 2 ZK as a cluster in Target > 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) > 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) > 5. Start 1 MM > 6. Start ProducerPerformance to send some data > 7. After Producer is done, start ConsoleConsumer to consume data > 8. Stop all processes and validate if there is any data loss. > 9. No failure is introduced to any process in this test > Attached a tar file which contains the logs and system test output for both > cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-998) Producer should not retry on non-recoverable error codes
Joel Koshy created KAFKA-998: Summary: Producer should not retry on non-recoverable error codes Key: KAFKA-998 URL: https://issues.apache.org/jira/browse/KAFKA-998 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Joel Koshy Based on a discussion with Guozhang. The producer currently retries on all error codes (including messagesizetoolarge which is pointless to retry on). This can slow down the producer unnecessarily. If at all we want to retry on that error code we would need to retry with a smaller batch size, but that's a separate discussion. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception
[ https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731187#comment-13731187 ] Joel Koshy commented on KAFKA-992: -- Delayed review - looks good to me, although I still don't see a benefit in storing the timestamp. i.e., the approach to retry on nodeexists if the host and port are the same would remain the same. i.e., it seems more for informative purposes. Let me know if I'm missing something. @Jun, you have a point about the controller. It seems it may not be a problem there since controller re-election will happen only after the data is actually deleted. For consumers it may not be an issue either given that the consumer id string includes a random uuid. > Double Check on Broker Registration to Avoid False NodeExist Exception > -- > > Key: KAFKA-992 > URL: https://issues.apache.org/jira/browse/KAFKA-992 > Project: Kafka > Issue Type: Bug >Reporter: Neha Narkhede >Assignee: Guozhang Wang > Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, > KAFKA-992.v3.patch, KAFKA-992.v4.patch > > > The current behavior of zookeeper for ephemeral nodes is that session > expiration and ephemeral node deletion is not an atomic operation. > The side-effect of the above zookeeper behavior in Kafka, for certain corner > cases, is that ephemeral nodes can be lost even if the session is not > expired. The sequence of events that can lead to lossy ephemeral nodes is as > follows - > 1. The session expires on the client, it assumes the ephemeral nodes are > deleted, so it establishes a new session with zookeeper and tries to > re-create the ephemeral nodes. > 2. However, when it tries to re-create the ephemeral node,zookeeper throws > back a NodeExists error code. Now this is legitimate during a session > disconnect event (since zkclient automatically retries the > operation and raises a NodeExists error). Also by design, Kafka server > doesn't have multiple zookeeper clients create the same ephemeral node, so > Kafka server assumes the NodeExists is normal. > 3. However, after a few seconds zookeeper deletes that ephemeral node. So > from the client's perspective, even though the client has a new valid > session, its ephemeral node is gone. > This behavior is triggered due to very long fsync operations on the zookeeper > leader. When the leader wakes up from such a long fsync operation, it has > several sessions to expire. And the time between the session expiration and > the ephemeral node deletion is magnified. Between these 2 operations, a > zookeeper client can issue a ephemeral node creation operation, that could've > appeared to have succeeded, but the leader later deletes the ephemeral node > leading to permanent ephemeral node loss from the client's perspective. > Thread from zookeeper mailing list: > http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception
[ https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731282#comment-13731282 ] Joel Koshy commented on KAFKA-992: -- ok nm the comment about timestamp. I had forgotten that nodeexists wouldn't be thrown if the data is the same. > Double Check on Broker Registration to Avoid False NodeExist Exception > -- > > Key: KAFKA-992 > URL: https://issues.apache.org/jira/browse/KAFKA-992 > Project: Kafka > Issue Type: Bug >Reporter: Neha Narkhede >Assignee: Guozhang Wang > Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, > KAFKA-992.v3.patch, KAFKA-992.v4.patch > > > The current behavior of zookeeper for ephemeral nodes is that session > expiration and ephemeral node deletion is not an atomic operation. > The side-effect of the above zookeeper behavior in Kafka, for certain corner > cases, is that ephemeral nodes can be lost even if the session is not > expired. The sequence of events that can lead to lossy ephemeral nodes is as > follows - > 1. The session expires on the client, it assumes the ephemeral nodes are > deleted, so it establishes a new session with zookeeper and tries to > re-create the ephemeral nodes. > 2. However, when it tries to re-create the ephemeral node,zookeeper throws > back a NodeExists error code. Now this is legitimate during a session > disconnect event (since zkclient automatically retries the > operation and raises a NodeExists error). Also by design, Kafka server > doesn't have multiple zookeeper clients create the same ephemeral node, so > Kafka server assumes the NodeExists is normal. > 3. However, after a few seconds zookeeper deletes that ephemeral node. So > from the client's perspective, even though the client has a new valid > session, its ephemeral node is gone. > This behavior is triggered due to very long fsync operations on the zookeeper > leader. When the leader wakes up from such a long fsync operation, it has > several sessions to expire. And the time between the session expiration and > the ephemeral node deletion is magnified. Between these 2 operations, a > zookeeper client can issue a ephemeral node creation operation, that could've > appeared to have succeeded, but the leader later deletes the ephemeral node > leading to permanent ephemeral node loss from the client's perspective. > Thread from zookeeper mailing list: > http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception
[ https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731337#comment-13731337 ] Joel Koshy commented on KAFKA-992: -- and nm for my comments about controller/consumers as well. For consumers, we don't regenerate the consumer id string. For controller, what can end up happening is: - controller session expires and becomes the controller again (with the stale ephemeral node) - another broker (whose session may not have expired) receives a watch when the stale ephemeral node is actually deleted - so we can end up with two controllers in this scenario. > Double Check on Broker Registration to Avoid False NodeExist Exception > -- > > Key: KAFKA-992 > URL: https://issues.apache.org/jira/browse/KAFKA-992 > Project: Kafka > Issue Type: Bug >Reporter: Neha Narkhede >Assignee: Guozhang Wang > Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, > KAFKA-992.v3.patch, KAFKA-992.v4.patch > > > The current behavior of zookeeper for ephemeral nodes is that session > expiration and ephemeral node deletion is not an atomic operation. > The side-effect of the above zookeeper behavior in Kafka, for certain corner > cases, is that ephemeral nodes can be lost even if the session is not > expired. The sequence of events that can lead to lossy ephemeral nodes is as > follows - > 1. The session expires on the client, it assumes the ephemeral nodes are > deleted, so it establishes a new session with zookeeper and tries to > re-create the ephemeral nodes. > 2. However, when it tries to re-create the ephemeral node,zookeeper throws > back a NodeExists error code. Now this is legitimate during a session > disconnect event (since zkclient automatically retries the > operation and raises a NodeExists error). Also by design, Kafka server > doesn't have multiple zookeeper clients create the same ephemeral node, so > Kafka server assumes the NodeExists is normal. > 3. However, after a few seconds zookeeper deletes that ephemeral node. So > from the client's perspective, even though the client has a new valid > session, its ephemeral node is gone. > This behavior is triggered due to very long fsync operations on the zookeeper > leader. When the leader wakes up from such a long fsync operation, it has > several sessions to expire. And the time between the session expiration and > the ephemeral node deletion is magnified. Between these 2 operations, a > zookeeper client can issue a ephemeral node creation operation, that could've > appeared to have succeeded, but the leader later deletes the ephemeral node > leading to permanent ephemeral node loss from the client's perspective. > Thread from zookeeper mailing list: > http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability
[ https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13733837#comment-13733837 ] Joel Koshy commented on KAFKA-990: -- - Topics to move json file format seems unnecessarily complicated. Why not just a JSON array? - Use CommandLineUtils.checkRequiredArgs - May be helpful to also print out the existing partition assignment and the final assignment. - "dryrun" to "dry-run" which I think is the spelling unix tools like patch tend to use. - line 88: use head instead of assuming 0 exists (start partition id could be != 0) I did not finish going through all the changes in controller, but thought I would put in my comments so far :) > Fix ReassignPartitionCommand and improve usability > -- > > Key: KAFKA-990 > URL: https://issues.apache.org/jira/browse/KAFKA-990 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch > > > 1. The tool does not register for IsrChangeListener on controller failover. > 2. There is a race condition where the previous listener can fire on > controller failover and the replicas can be in ISR. Even after re-registering > the ISR listener after failover, it will never be triggered. > 3. The input the tool is a static list which is very hard to use. To improve > this, as a first step the tool needs to take a list of topics and list of > brokers to do the assignment to and then generate the reassignment plan. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability
[ https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13735120#comment-13735120 ] Joel Koshy commented on KAFKA-990: -- Can you elaborate on the change to shutdownBroker in KafkaController? I think we need to include shutting down brokers because the previous shutdown attempt may have been incomplete due to no other brokers in ISR for some partition which would have prevented leader movement. Subsequent attempts would now be rejected. Good catches on the controller failover. Agree with Neha that #2 is not a problem for replicas that are in ISR, however, we do need to re-register the ISR change listener for those replicas that are in ISR. Finally, we should probably open a separate jira to implement a feature to cancel an ongoing reassignment given that it is a long-running operation. The dry-run option reduces the need for this but nevertheless I think it's a good feature to support in the future. > Fix ReassignPartitionCommand and improve usability > -- > > Key: KAFKA-990 > URL: https://issues.apache.org/jira/browse/KAFKA-990 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch > > > 1. The tool does not register for IsrChangeListener on controller failover. > 2. There is a race condition where the previous listener can fire on > controller failover and the replicas can be in ISR. Even after re-registering > the ISR listener after failover, it will never be triggered. > 3. The input the tool is a static list which is very hard to use. To improve > this, as a first step the tool needs to take a list of topics and list of > brokers to do the assignment to and then generate the reassignment plan. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability
[ https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13735653#comment-13735653 ] Joel Koshy commented on KAFKA-990: -- Looks like I might have looked at the wrong patch. I'll review this again this weekend. > Fix ReassignPartitionCommand and improve usability > -- > > Key: KAFKA-990 > URL: https://issues.apache.org/jira/browse/KAFKA-990 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch > > > 1. The tool does not register for IsrChangeListener on controller failover. > 2. There is a race condition where the previous listener can fire on > controller failover and the replicas can be in ISR. Even after re-registering > the ISR listener after failover, it will never be triggered. > 3. The input the tool is a static list which is very hard to use. To improve > this, as a first step the tool needs to take a list of topics and list of > brokers to do the assignment to and then generate the reassignment plan. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability
[ https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13737010#comment-13737010 ] Joel Koshy commented on KAFKA-990: -- The rebased patch looks good - the shutdown changes I was referring to were in v1. +1 on the rebased patch - we can fix the minor comments either on check-in or in a separate jira. > Fix ReassignPartitionCommand and improve usability > -- > > Key: KAFKA-990 > URL: https://issues.apache.org/jira/browse/KAFKA-990 > Project: Kafka > Issue Type: Bug >Reporter: Sriram Subramanian >Assignee: Sriram Subramanian > Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch > > > 1. The tool does not register for IsrChangeListener on controller failover. > 2. There is a race condition where the previous listener can fire on > controller failover and the replicas can be in ISR. Even after re-registering > the ISR listener after failover, it will never be triggered. > 3. The input the tool is a static list which is very hard to use. To improve > this, as a first step the tool needs to take a list of topics and list of > brokers to do the assignment to and then generate the reassignment plan. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira