[jira] [Commented] (KAFKA-735) Add looping and JSON output for ConsumerOffsetChecker

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13670061#comment-13670061
 ] 

Jun Rao commented on KAFKA-735:
---

Sorry for the late review. This probably should be fixed in trunk. The patch no 
longer applies. Could you rebase? Thanks,

> Add looping and JSON output for ConsumerOffsetChecker
> -
>
> Key: KAFKA-735
> URL: https://issues.apache.org/jira/browse/KAFKA-735
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 0.8
>Reporter: Dave DeMaagd
>  Labels: patch
> Attachments: KAFKA-735-2.patch, KAFKA-735.patch
>
>
> New options for kafka.tools.ConsumerOffsetChecker:
> --asjson -  Json Output 
> --loop N - Loop interval (in seconds, greater than 0)
> Both are optional/independent:
> --asjson w/o --loop => Output as JSON, once and terminates
> --loop w/o --asjson => Output in default tabular format, repeats on loop 
> defined interval
> --loop w/ --asjson => Output as JSON, repeats on loop defined interval
> neither --asjson nor --loop => Current behavior, output in tabular format 
> once and terminates
> Patch to be attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-733) Fat jar option for build, or override for ivy cache location

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-733:
--

   Resolution: Duplicate
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

This is already fixed as part of the sbt enhancement.

> Fat jar option for build, or override for ivy cache location 
> -
>
> Key: KAFKA-733
> URL: https://issues.apache.org/jira/browse/KAFKA-733
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 0.8
>Reporter: Dave DeMaagd
>Assignee: Dave DeMaagd
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: KAFKA-733-2.patch, KAFKA-733.patch
>
>
> Need some kind of self-contained mechanism for running kafka to get around 
> the following:
> 1) The location of the source checkout/build is not necessarily the same 
> place where it will be running (the build location and that user's ivy cache 
> dir) potentially leading to sync problems (forgetting the ivy dir) or just 
> adding overhead to the deployment process (additional steps to remember 
> introduces more chances for mistakes)
> 2) The user running the kafka service in a production setting may not even 
> have a real home directory
> Think something like a 'fat jar' packaging (something that contains all 
> necessary jar versions in one convenient place) would simplify deployment and 
> reduce the chance for error (only one lib package to worry about, and it 
> contains everything needed) and would be a little more production friendly

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-852) Remove clientId from OffsetFetchResponse and OffsetCommitResponse

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13670055#comment-13670055
 ] 

Jun Rao commented on KAFKA-852:
---

Sorry for the late review. The patch no longer applies to trunk. Could you 
rebase? Thanks,

> Remove clientId from OffsetFetchResponse and OffsetCommitResponse
> -
>
> Key: KAFKA-852
> URL: https://issues.apache.org/jira/browse/KAFKA-852
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
> Fix For: 0.8.1
>
> Attachments: 
> 0001-KAFKA-852-remove-clientId-from-Offset-Fetch-Commit-R.patch, 
> KAFKA-852.diff
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> These are not needed and conflict with the API documentation. Should be 
> removed to be consistent with other APIs

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-856) Correlation id for OffsetFetch request (#2) always responds with 0

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-856:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1. Committed to 0.8.

> Correlation id for OffsetFetch request (#2) always responds with 0
> --
>
> Key: KAFKA-856
> URL: https://issues.apache.org/jira/browse/KAFKA-856
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.8, 0.8.1
> Environment: Linux & Mac
>Reporter: Milosz Tanski
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: KAFKA-856.patch
>
>
> The in the new Kafka when making an OffsetFetch request the correlation id 
> always response is always sent as 0. It doesn't matter if the client request 
> specifies a correlation id other then 0.
> Example wireshark capture:
>   00 00 00 31 00 07 00 00  00 00 00 2a 00 03 66 6f ...1 ...*..fo
> 0010  6f 00 0a 74 65 73 74 2d  67 72 6f 75 70 00 00 00 o..test- group...
> 0020  01 00 0a 74 65 73 74 5f  74 6f 70 69 63 00 00 00 ...test_ topic...
> 0030  01 00 00 00 00   .
>  
> Request #1
> 
> len:00 00 00 31
> api:00 07
> ver:00 00
> cor:00 00 00 2a
> cid:2a 00 03 66 6f 6f
> 
>  
>   00 00 00 2d 00 00 00 2a  00 03 66 6f 6f 00 00 00 ...-...* ..foo...
> 0010  01 00 0a 74 65 73 74 5f  74 6f 70 69 63 00 00 00 ...test_ topic...
> 0020  01 00 00 00 00 ff ff ff  ff ff ff ff ff 00 00 00  
> 0030  03
>  
> Response #1
> 
> len:00 00 00 2d
> cor:00 00 00 2a
> cid:2a 00 03 66 6f 6f
> 
>  
> 0035  00 00 00 35 00 02 00 00  00 00 00 2a 00 03 66 6f ...5 ...*..fo
> 0045  6f ff ff ff ff 00 00 00  01 00 0a 74 65 73 74 5f o... ...test_
> 0055  74 6f 70 69 63 00 00 00  01 00 00 00 00 ff ff ff topic... 
> 0065  ff ff ff ff fe 00 00 00  01   .
>  
> Request #2
> -
> len:00 00 00 35
> api:00 02
> ver:00 00
> cor:00 00 00 2a
> cid:00 03 66 6f 6f
>  
> 0031  00 00 00 2a 00 00 00 00  00 00 00 01 00 0a 74 65 ...* ..te
> 0041  73 74 5f 74 6f 70 69 63  00 00 00 01 00 00 00 00 st_topic 
> 0051  00 00 00 00 00 01 00 00  00 00 00 00 00 00    ..
>  
> Response #2:
> --
> len:00 00 00 2a
> cor:00 00 00 00
> alen:   00 00 00 01
> 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-897) NullPointerException in ConsoleConsumer

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-897:
--

   Resolution: Fixed
Fix Version/s: 0.8.1
   Status: Resolved  (was: Patch Available)

Thanks for the patch. Committed to trunk.

> NullPointerException in ConsoleConsumer
> ---
>
> Key: KAFKA-897
> URL: https://issues.apache.org/jira/browse/KAFKA-897
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Colin B.
>Assignee: Neha Narkhede
>Priority: Minor
> Fix For: 0.8.1
>
> Attachments: Kafka897-v1.patch, KAFKA-897-v2.patch
>
>
> The protocol document [1] mentions that keys and values in message sets can 
> be null. However the ConsoleConsumer throws a NPE when a null is passed for 
> the value.
> java.lang.NullPointerException
> at kafka.utils.Utils$.readBytes(Utils.scala:141)
> at 
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106)
> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195)
> at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13670028#comment-13670028
 ] 

Jun Rao commented on KAFKA-881:
---

Sorry for the delay. I can't apply the patch to the 0.7 branch. Could you 
follow https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow and 
submit a new patch?

> Kafka broker not respecting log.roll.hours
> --
>
> Key: KAFKA-881
> URL: https://issues.apache.org/jira/browse/KAFKA-881
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.7.2
>Reporter: Dan F
>Assignee: Jay Kreps
> Attachments: kafka_roll.patch
>
>
> We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs 
> would be rolled every hour, or more. Only, sometimes logs that are many hours 
> (sometimes days) old have more data added to them. This perturbs our systems 
> for reasons I won't get in to.
> I don't know Scala or Kafka well, but I have proposal for why this might 
> happen: upon restart, a broker forgets when its log files have been appended 
> to ("firstAppendTime"). Then a potentially infinite amount of time later, the 
> restarted broker receives another message for the particular (topic, 
> partition), and starts the clock again. It will then roll over that log after 
> an hour.
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala
>  says:
>   /* the maximum time before a new log segment is rolled out */
>   val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, 
> Int.MaxValue))
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala
>  has maybeRoll, which needs segment.firstAppendTime defined. It also has 
> updateFirstAppendTime() which says if it's empty, then set it.
> If my hypothesis is correct about why it is happening, here is a case where 
> rolling is longer than an hour, even on a high volume topic:
> - write to a topic for 20 minutes
> - restart the broker
> - wait for 5 days
> - write to a topic for 20 minutes
> - restart the broker
> - write to a topic for an hour
> The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long 
> as you want.
> Proposed solution:
> The very easiest thing to do would be to have Kafka re-initialized 
> firstAppendTime with the file creation time. Unfortunately, there is no file 
> creation time in UNIX. There is ctime, change time, updated when a file's 
> inode information is changed.
> One solution is to embed the firstAppendTime in the filename (say, seconds 
> since epoch). Then when you open it you could reset firstAppendTime to 
> exactly what it really was. This ignores clock drift or resetting. One could 
> set firstAppendTime to min(filename-based time, current time).
> A second solution is to make the Kafka log roll over at specific times, 
> regardless of when the file was created. Conceptually, time can be divided 
> into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, 
> when firstAppendTime is empty, compute the next rollover time (say, next = 
> (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file 
> mtime (last modified) is before the current rollover window ( 
> (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll 
> over when you cross next, and reset next.
> A third solution (not perfect, but an approximation at least) would be to not 
> to write to a segment if firstAppendTime is not defined and the timestamp on 
> the file is more than log.roll.hours old.
> There are probably other solutions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-259) Give better error message when trying to run shell scripts without having built/downloaded the jars yet

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-259:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for patch v2. +1. Committed to 0.8.

> Give better error message when trying to run shell scripts without having 
> built/downloaded the jars yet
> ---
>
> Key: KAFKA-259
> URL: https://issues.apache.org/jira/browse/KAFKA-259
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Mac OSX Lion
>Reporter: Ross Crawford-d'Heureuse
>Priority: Minor
>  Labels: newbie
> Fix For: 0.8
>
> Attachments: KAFKA-259-v1.patch, KAFKA-259-v2.patch
>
>
> Hi there, I've cloned from the kafka github repo and tried to run the start 
> server script:
>  ./bin/kafka-server-start.sh config/server.properties 
> Which results in:
> Exception in thread "main" java.lang.NoClassDefFoundError: kafka/Kafka
> Caused by: java.lang.ClassNotFoundException: kafka.Kafka
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> It seems that Im missing a build step? what have I forgotten to do?
> Thanks in advance and I look forward to using kafka.
> regards
> rcdh

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Contribution info

2013-05-29 Thread Jun Rao
You just need to create an Apache jira account so that you can attach
patches.

Thanks,

Jun


On Wed, May 29, 2013 at 11:21 AM, Krishna Kiran M wrote:

> Hi ,Do I need to register as any member to be able to contribute as
> Developer for this project"
>
> Regards,
> Krishna
> 214-326-8816
>


[jira] [Updated] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-899:
--

Attachment: kafka-899_v3.patch

Thanks for the review. Attach patch v3 that addresses the above issue. Changed 
the logging level from error to warning since the real error will be reported 
when all retries have failed.

> LeaderNotAvailableException the first time a new message for a partition is 
> processed.
> --
>
> Key: KAFKA-899
> URL: https://issues.apache.org/jira/browse/KAFKA-899
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jason Rosenberg
>Assignee: Jun Rao
> Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the 
> following, all embedded in the same java process:
> -- spins up a zk instance
> -- spins up a kafka server using a fresh log directory
> -- creates a producer and sends a message
> -- creates a high-level consumer and verifies that it can consume the message
> -- shuts down the consumer
> -- stops the kafka server
> -- stops zk
> The test seems to be working fine now, however, I consistently see the 
> following exceptions (which from poking around the mailing list seem to be 
> expected?).  If these are expected, can we suppress the logging of these 
> exceptions, since it clutters the output of tests, and presumably, clutters 
> the logs of the running server/consumers, during clean startup and 
> shutdown..
> When I call producer.send(), I get:
> 1071 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching 
> metadata  partition 0 leader: nonereplicas:   isr:
> isUnderReplicated: false for topic partition [test-topic,0]: [class 
> kafka.common.LeaderNotAvailableException]
> 1081 [main] WARN kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic,partition due to
> kafka.common.LeaderNotAvailableException: No leader for any partition
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>   at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:74)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>   at 
> com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunn

[jira] [Updated] (KAFKA-259) Give better error message when trying to run shell scripts without having built/downloaded the jars yet

2013-05-29 Thread Ashwanth Fernando (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwanth Fernando updated KAFKA-259:


Attachment: KAFKA-259-v2.patch

> Give better error message when trying to run shell scripts without having 
> built/downloaded the jars yet
> ---
>
> Key: KAFKA-259
> URL: https://issues.apache.org/jira/browse/KAFKA-259
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Mac OSX Lion
>Reporter: Ross Crawford-d'Heureuse
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-259-v1.patch, KAFKA-259-v2.patch
>
>
> Hi there, I've cloned from the kafka github repo and tried to run the start 
> server script:
>  ./bin/kafka-server-start.sh config/server.properties 
> Which results in:
> Exception in thread "main" java.lang.NoClassDefFoundError: kafka/Kafka
> Caused by: java.lang.ClassNotFoundException: kafka.Kafka
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> It seems that Im missing a build step? what have I forgotten to do?
> Thanks in advance and I look forward to using kafka.
> regards
> rcdh

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-259) Give better error message when trying to run shell scripts without having built/downloaded the jars yet

2013-05-29 Thread Ashwanth Fernando (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669862#comment-13669862
 ] 

Ashwanth Fernando commented on KAFKA-259:
-

[~junrao] - I executed the simple contributor workflow in this page 
(https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow) again. 
Attached the patch. Can you please try again?

> Give better error message when trying to run shell scripts without having 
> built/downloaded the jars yet
> ---
>
> Key: KAFKA-259
> URL: https://issues.apache.org/jira/browse/KAFKA-259
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Mac OSX Lion
>Reporter: Ross Crawford-d'Heureuse
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-259-v1.patch, KAFKA-259-v2.patch
>
>
> Hi there, I've cloned from the kafka github repo and tried to run the start 
> server script:
>  ./bin/kafka-server-start.sh config/server.properties 
> Which results in:
> Exception in thread "main" java.lang.NoClassDefFoundError: kafka/Kafka
> Caused by: java.lang.ClassNotFoundException: kafka.Kafka
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> It seems that Im missing a build step? what have I forgotten to do?
> Thanks in advance and I look forward to using kafka.
> regards
> rcdh

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-29 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v2.patch

Yes - I think that would be better. Moved it to AbstractFetcherManager. So 
depending on whether you are looking at replica fetchers or consumer fetchers, 
the MaxLag mbean will show up in ReplicaFetcherManager or 
ConsumerFetcherManager respectively.

> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Contribution info

2013-05-29 Thread Krishna Kiran M
Hi ,Do I need to register as any member to be able to contribute as
Developer for this project"

Regards,
Krishna
214-326-8816


[jira] [Commented] (KAFKA-917) Expose zk.session.timeout.ms in console consumer

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669450#comment-13669450
 ] 

Neha Narkhede commented on KAFKA-917:
-

Please can you attach a patch for 0.7 branch ?

> Expose zk.session.timeout.ms in console consumer
> 
>
> Key: KAFKA-917
> URL: https://issues.apache.org/jira/browse/KAFKA-917
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.7, 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-917.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-919) Disabling of auto commit is ignored during consumer group rebalancing

2013-05-29 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-919.
---


> Disabling of auto commit is ignored during consumer group rebalancing
> -
>
> Key: KAFKA-919
> URL: https://issues.apache.org/jira/browse/KAFKA-919
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8
> Environment: java 7/linux
>Reporter: Phil Hargett
> Fix For: 0.8
>
> Attachments: kafka-919.patch
>
>
> From the mailing list:
> In one of our applications using Kafka, we are using the high-level consumer 
> to pull messages from our topic.
> Because we pull messages from topics in discrete units (e.g., an hour's worth 
> of messages), we want to control explicitly when offsets are committed.
> Even though "auto.commit.enable" is set to false, during consumer group 
> rebalancing, offsets are committed anyway, regardless of the setting of this 
> flag.
> Is this a bug? Or just a known gap in offset management? I do see plenty of 
> notes on the wiki suggesting future releases may enable applications using 
> the high-level consumer to have more fine-grained control over offset 
> management.
> I also fully realize that different applications have different needs, and 
> meeting all of them with a clean API can be challenging.
> In the case of this application, the high-level consumer solves the problem 
> of locating the correct in a cluster for a given topic, so there are 
> advantages to using it, even if we are not using it to balance fetch load 
> across multiple consumers. We ideally have only 1 consumer active per 
> consumer group, and can tolerate some duplicate messages. But, the consumer 
> groups make it easy for 1 consumer to recover at the correct starting point, 
> should the prior consumer in the group have failed before doing a commit.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-926) Error in consumer when the leader for some partitions is failing over to another replica.

2013-05-29 Thread BalajiSeshadri (JIRA)
BalajiSeshadri created KAFKA-926:


 Summary: Error in consumer when the leader for some partitions is 
failing over to another replica.
 Key: KAFKA-926
 URL: https://issues.apache.org/jira/browse/KAFKA-926
 Project: Kafka
  Issue Type: Bug
Reporter: BalajiSeshadri
 Fix For: 0.8.1


Error in consumer when the leader for some partitions is failing over to 
another replica.-Created as per request from Neha.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-926) Error in consumer when the leader for some partitions is failing over to another replica.

2013-05-29 Thread BalajiSeshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669448#comment-13669448
 ] 

BalajiSeshadri commented on KAFKA-926:
--

Orginally this was part of KAFKA-816

> Error in consumer when the leader for some partitions is failing over to 
> another replica.
> -
>
> Key: KAFKA-926
> URL: https://issues.apache.org/jira/browse/KAFKA-926
> Project: Kafka
>  Issue Type: Bug
>Reporter: BalajiSeshadri
> Fix For: 0.8.1
>
>
> Error in consumer when the leader for some partitions is failing over to 
> another replica.-Created as per request from Neha.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-649) Cleanup log4j logging

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669440#comment-13669440
 ] 

Jun Rao commented on KAFKA-649:
---

Thanks for the review. Addressed the comment and committed the extra patch to 
0.8.

> Cleanup log4j logging
> -
>
> Key: KAFKA-649
> URL: https://issues.apache.org/jira/browse/KAFKA-649
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-649_extra.patch, kafka-649.patch
>
>
> Review the logs and do the following:
> 1. Fix confusing or duplicative messages
> 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
> It would also be nice to add a log4j logger for the request logging (i.e. the 
> access log) and another for the controller state change log, since these 
> really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-816) Reduce noise in Kafka server logs due to NotLeaderForPartitionException

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669434#comment-13669434
 ] 

Neha Narkhede commented on KAFKA-816:
-

Balaji, 

That fix was to reduce the noise in the kafka server logs. The error you see is 
on the consumer when the leader for some partitions is failing over to another 
replica. Can you please move the issue to a separate JIRA?

> Reduce noise in Kafka server logs due to NotLeaderForPartitionException
> ---
>
> Key: KAFKA-816
> URL: https://issues.apache.org/jira/browse/KAFKA-816
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: kafka-0.8, p2
> Attachments: KAFKA-816.jpg, kafka-816.patch, kafka-816-v2.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> NotLeaderForPartitionException is logged at the ERROR level with a full stack 
> trace. But really this is just an informational message on the server when a 
> client with stale metadata sends requests to the wrong leader for a 
> partition. This floods the logs either if there are many clients or few 
> clients sending many topics (migration tool or mirror maker). This should 
> probably be logged at WARN and without the stack trace

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-900) ClosedByInterruptException when high-level consumer shutdown normally

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-900:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for the review. Committed to 0.8.

> ClosedByInterruptException when high-level consumer shutdown normally
> -
>
> Key: KAFKA-900
> URL: https://issues.apache.org/jira/browse/KAFKA-900
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Jason Rosenberg
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-900.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, 
> all embedded in the same java process: 
> -- spins up a zk instance 
> -- spins up a kafka server using a fresh log directory 
> -- creates a producer and sends a message 
> -- creates a high-level consumer and verifies that it can consume the message 
> -- shuts down the consumer 
> -- stops the kafka server 
> -- stops zk 
> The test seems to be working fine now, however, I consistently see the 
> following exception, when the consumer connector is shutdown:
> 1699 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683]
>  WARN kafka.consumer.ConsumerFetcherThread  - 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: 
> group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683;
>  ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: 
> [test-topic,0] -> PartitionFetchInfo(1,1048576)
> java.nio.channels.ClosedByInterruptException
>   at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
>   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>   at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
>   at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer  - Shutting down 
> KafkaServer
> 2030 [main] INFO com.squareup.kafka.server.KafkaServer  - Shut down complete 
> for KafkaServer
> Disconnected from the target VM, address: '127.0.0.1:49243', transport: 
> 'socket'
> It would be great if instead, something meaningful was logged, like:
> "Consumer connector has been shutdown"

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-649) Cleanup log4j logging

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669433#comment-13669433
 ] 

Neha Narkhede commented on KAFKA-649:
-

Actually, there is another error message that needs to be fixed in KafkaApis - 

In handleTopicMetadataRequest() -

  error("Error while fetching metadata for partition 
%s".format(topicAndPartition), e)

Here, we should special case LeaderNotAvailable and ReplicaNotAvailable. And 
only print error if it is anything other than that. This will reduce the log 
pollution due to metadata requests during leader failover

> Cleanup log4j logging
> -
>
> Key: KAFKA-649
> URL: https://issues.apache.org/jira/browse/KAFKA-649
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-649_extra.patch, kafka-649.patch
>
>
> Review the logs and do the following:
> 1. Fix confusing or duplicative messages
> 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
> It would also be nice to add a log4j logger for the request logging (i.e. the 
> access log) and another for the controller state change log, since these 
> really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-259) Give better error message when trying to run shell scripts without having built/downloaded the jars yet

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669425#comment-13669425
 ] 

Jun Rao commented on KAFKA-259:
---

Thanks for the patch. It doesn't apply to 0.8 though. Could you provide another 
patch?

git apply ~/Downloads/KAFKA-259-v1.patch 
/Users/jrao/Downloads/KAFKA-259-v1.patch:21: trailing whitespace.
if [ $exitval -eq "1" ] ; then 
/Users/jrao/Downloads/KAFKA-259-v1.patch:27: trailing whitespace.
if [[ -n "$match" ]]; then 
error: patch failed: bin/kafka-run-class.sh:81
error: bin/kafka-run-class.sh: patch does not apply
error: patch failed: bin/kafka-run-class.sh:93
error: bin/kafka-run-class.sh: patch does not apply


> Give better error message when trying to run shell scripts without having 
> built/downloaded the jars yet
> ---
>
> Key: KAFKA-259
> URL: https://issues.apache.org/jira/browse/KAFKA-259
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Mac OSX Lion
>Reporter: Ross Crawford-d'Heureuse
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-259-v1.patch
>
>
> Hi there, I've cloned from the kafka github repo and tried to run the start 
> server script:
>  ./bin/kafka-server-start.sh config/server.properties 
> Which results in:
> Exception in thread "main" java.lang.NoClassDefFoundError: kafka/Kafka
> Caused by: java.lang.ClassNotFoundException: kafka.Kafka
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> It seems that Im missing a build step? what have I forgotten to do?
> Thanks in advance and I look forward to using kafka.
> regards
> rcdh

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-649) Cleanup log4j logging

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669426#comment-13669426
 ] 

Neha Narkhede commented on KAFKA-649:
-

+1. Just one comment -

KafkaApis: Can we please change "fails" to "failed"

> Cleanup log4j logging
> -
>
> Key: KAFKA-649
> URL: https://issues.apache.org/jira/browse/KAFKA-649
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-649_extra.patch, kafka-649.patch
>
>
> Review the logs and do the following:
> 1. Fix confusing or duplicative messages
> 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
> It would also be nice to add a log4j logger for the request logging (i.e. the 
> access log) and another for the controller state change log, since these 
> really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: KAFKA-259

2013-05-29 Thread Jun Rao
Ashwanth,

Thanks for the patch. Commented on the jira.

Jun


On Tue, May 28, 2013 at 5:51 PM, Ashwanth Fernando <
aferna...@walmartlabs.com> wrote:

> Hi,
> I have submitted a patch for KAFKA-259 and its been approved by legal. Can
> you please go ahead and verify it and let me know how to proceed?
>
> https://issues.apache.org/jira/browse/KAFKA-259
>
> Thanks for all your help,
> Ashwanth
>


[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669421#comment-13669421
 ] 

Neha Narkhede commented on KAFKA-915:
-

Joel, could you take a quick look ?

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-05-29 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-915:


Assignee: Joel Koshy

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-923) Improve controller failover latency

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669417#comment-13669417
 ] 

Jun Rao commented on KAFKA-923:
---

Thanks for patch v2. Looks good. +1.

> Improve controller failover latency
> ---
>
> Key: KAFKA-923
> URL: https://issues.apache.org/jira/browse/KAFKA-923
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: kafka-0.8
> Attachments: kafka-923-v1.patch, kafka-923-v2.patch
>
>
> During controller failover, we do the following things -
> 1. Increment controller epoch 
> 2. Initialize controller context
> 3. Initialize replica state machine
> 4. Initialize partition state machine
> During step 2 above, we read the information of all topics and partitions, 
> the replica assignments and leadership information. During step 3 and 4, we 
> re-read some of this information from zookeeper. Since the zookeeper reads 
> are proportional to the number of topics and the reads are serial, it is 
> important to optimize this. The zookeeper reads in steps 3 and 4 are not 
> required.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-900) ClosedByInterruptException when high-level consumer shutdown normally

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669414#comment-13669414
 ] 

Neha Narkhede commented on KAFKA-900:
-

+1, thanks for the patch!

> ClosedByInterruptException when high-level consumer shutdown normally
> -
>
> Key: KAFKA-900
> URL: https://issues.apache.org/jira/browse/KAFKA-900
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Jason Rosenberg
>Assignee: Jun Rao
> Attachments: kafka-900.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, 
> all embedded in the same java process: 
> -- spins up a zk instance 
> -- spins up a kafka server using a fresh log directory 
> -- creates a producer and sends a message 
> -- creates a high-level consumer and verifies that it can consume the message 
> -- shuts down the consumer 
> -- stops the kafka server 
> -- stops zk 
> The test seems to be working fine now, however, I consistently see the 
> following exception, when the consumer connector is shutdown:
> 1699 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683]
>  WARN kafka.consumer.ConsumerFetcherThread  - 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: 
> group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683;
>  ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: 
> [test-topic,0] -> PartitionFetchInfo(1,1048576)
> java.nio.channels.ClosedByInterruptException
>   at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
>   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>   at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
>   at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer  - Shutting down 
> KafkaServer
> 2030 [main] INFO com.squareup.kafka.server.KafkaServer  - Shut down complete 
> for KafkaServer
> Disconnected from the target VM, address: '127.0.0.1:49243', transport: 
> 'socket'
> It would be great if instead, something meaningful was logged, like:
> "Consumer connector has been shutdown"

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-922) System Test - set retry.backoff.ms=300 to testcase_0119

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669407#comment-13669407
 ] 

Neha Narkhede commented on KAFKA-922:
-

+1, thanks John!

> System Test - set retry.backoff.ms=300 to testcase_0119
> ---
>
> Key: KAFKA-922
> URL: https://issues.apache.org/jira/browse/KAFKA-922
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
> Attachments: kafka-922-v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.

2013-05-29 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669404#comment-13669404
 ] 

Neha Narkhede commented on KAFKA-899:
-

Thanks for the patch!

1. Another place where we can make it easier for the user to know the reason 
for the send failure -

error("Produce request with correlation id %d failed due to 
response %s. List of failed topic partitions is %s"
  .format(currentCorrelationId, response.toString, 
failedTopicPartitions.mkString(",")))

Here, we print the entire response. Rather we should only print the partition 
and corresponding error status for partitions with non-zero error code

2. When we do print the error status above, we should print the text name of 
the error instead of the integer error code. For this, we can override 
toString() in ProducerResponseStatus.

> LeaderNotAvailableException the first time a new message for a partition is 
> processed.
> --
>
> Key: KAFKA-899
> URL: https://issues.apache.org/jira/browse/KAFKA-899
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jason Rosenberg
>Assignee: Jun Rao
> Attachments: kafka-899.patch, kafka-899_v2.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the 
> following, all embedded in the same java process:
> -- spins up a zk instance
> -- spins up a kafka server using a fresh log directory
> -- creates a producer and sends a message
> -- creates a high-level consumer and verifies that it can consume the message
> -- shuts down the consumer
> -- stops the kafka server
> -- stops zk
> The test seems to be working fine now, however, I consistently see the 
> following exceptions (which from poking around the mailing list seem to be 
> expected?).  If these are expected, can we suppress the logging of these 
> exceptions, since it clutters the output of tests, and presumably, clutters 
> the logs of the running server/consumers, during clean startup and 
> shutdown..
> When I call producer.send(), I get:
> 1071 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching 
> metadata  partition 0 leader: nonereplicas:   isr:
> isUnderReplicated: false for topic partition [test-topic,0]: [class 
> kafka.common.LeaderNotAvailableException]
> 1081 [main] WARN kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic,partition due to
> kafka.common.LeaderNotAvailableException: No leader for any partition
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>   at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:74)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>   at 
> com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
>   at org.junit.runners.ParentRunner$3.run(Paren

[jira] [Updated] (KAFKA-923) Improve controller failover latency

2013-05-29 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-923:


Attachment: kafka-923-v2.patch

Thanks for the review.

1. Fixed it
2. Good point, removed the extra zk read

> Improve controller failover latency
> ---
>
> Key: KAFKA-923
> URL: https://issues.apache.org/jira/browse/KAFKA-923
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: kafka-0.8
> Attachments: kafka-923-v1.patch, kafka-923-v2.patch
>
>
> During controller failover, we do the following things -
> 1. Increment controller epoch 
> 2. Initialize controller context
> 3. Initialize replica state machine
> 4. Initialize partition state machine
> During step 2 above, we read the information of all topics and partitions, 
> the replica assignments and leadership information. During step 3 and 4, we 
> re-read some of this information from zookeeper. Since the zookeeper reads 
> are proportional to the number of topics and the reads are serial, it is 
> important to optimize this. The zookeeper reads in steps 3 and 4 are not 
> required.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.

2013-05-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-899:
--

Attachment: kafka-899_v2.patch

Attach patch v2 after the rebase.

> LeaderNotAvailableException the first time a new message for a partition is 
> processed.
> --
>
> Key: KAFKA-899
> URL: https://issues.apache.org/jira/browse/KAFKA-899
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jason Rosenberg
>Assignee: Jun Rao
> Attachments: kafka-899.patch, kafka-899_v2.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the 
> following, all embedded in the same java process:
> -- spins up a zk instance
> -- spins up a kafka server using a fresh log directory
> -- creates a producer and sends a message
> -- creates a high-level consumer and verifies that it can consume the message
> -- shuts down the consumer
> -- stops the kafka server
> -- stops zk
> The test seems to be working fine now, however, I consistently see the 
> following exceptions (which from poking around the mailing list seem to be 
> expected?).  If these are expected, can we suppress the logging of these 
> exceptions, since it clutters the output of tests, and presumably, clutters 
> the logs of the running server/consumers, during clean startup and 
> shutdown..
> When I call producer.send(), I get:
> 1071 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching 
> metadata  partition 0 leader: nonereplicas:   isr:
> isUnderReplicated: false for topic partition [test-topic,0]: [class 
> kafka.common.LeaderNotAvailableException]
> 1081 [main] WARN kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic,partition due to
> kafka.common.LeaderNotAvailableException: No leader for any partition
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>   at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:74)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>   at 
> com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195)
>   at 
> com.intellij.rt

[jira] [Updated] (KAFKA-925) Add optional partition key override in producer

2013-05-29 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-925:


Attachment: KAFKA-925-v1.patch

> Add optional partition key override in producer
> ---
>
> Key: KAFKA-925
> URL: https://issues.apache.org/jira/browse/KAFKA-925
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-925-v1.patch
>
>
> We have a key that is used for partitioning in the producer and stored with 
> the message. Actually these uses, though often the same, could be different. 
> The two meanings are effectively:
> 1. Assignment to a partition
> 2. Deduplication within a partition
> In cases where we want to allow the client to take advantage of both of these 
> and they aren't the same it would be nice to allow them to be specified 
> separately.
> To implement this I added an optional partition key to KeyedMessage. When 
> specified this key is used for partitioning rather than the message key. This 
> key is of type Any and the parametric typing is removed from the partitioner 
> to allow it to work with either key.
> An alternative would be to allow the partition id to specified in the 
> KeyedMessage. This would be slightly more convenient in the case where there 
> is no partition key but instead you know a priori the partition number--this 
> case must be handled by giving the partition id as the partition key and 
> using an identity partitioner which is slightly more roundabout. However this 
> is inconsistent with the normal partitioning which requires a key in the case 
> where the partition is determined by a key--in that case you would be 
> manually calling your partitioner in user code. It seems best to me to either 
> use a key or always a partition and since we currently take a key I stuck 
> with that.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-925) Add optional partition key override in producer

2013-05-29 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-925:
---

 Summary: Add optional partition key override in producer
 Key: KAFKA-925
 URL: https://issues.apache.org/jira/browse/KAFKA-925
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps


We have a key that is used for partitioning in the producer and stored with the 
message. Actually these uses, though often the same, could be different. The 
two meanings are effectively:
1. Assignment to a partition
2. Deduplication within a partition

In cases where we want to allow the client to take advantage of both of these 
and they aren't the same it would be nice to allow them to be specified 
separately.

To implement this I added an optional partition key to KeyedMessage. When 
specified this key is used for partitioning rather than the message key. This 
key is of type Any and the parametric typing is removed from the partitioner to 
allow it to work with either key.

An alternative would be to allow the partition id to specified in the 
KeyedMessage. This would be slightly more convenient in the case where there is 
no partition key but instead you know a priori the partition number--this case 
must be handled by giving the partition id as the partition key and using an 
identity partitioner which is slightly more roundabout. However this is 
inconsistent with the normal partitioning which requires a key in the case 
where the partition is determined by a key--in that case you would be manually 
calling your partitioner in user code. It seems best to me to either use a key 
or always a partition and since we currently take a key I stuck with that.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-923) Improve controller failover latency

2013-05-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669340#comment-13669340
 ] 

Jun Rao commented on KAFKA-923:
---

Thanks for the patch. Looks good. Some minor comments:

1. ReplicaStateMachine.getAllReplicasOnBroker(): We have the following 
statement:
  Set.empty[PartitionAndReplica] ++ brokerIds.map { ... }.flatten
Is it simpler to write it as 
  brokerIds.map { ... }.flatten.toSet?

2. In KafkaController: There is another place that we could optimize. In 
updateLeaderAndIsrCache(), we make the following call
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, 
controllerContext.allTopics.toSeq)
which will call getPartitionsForTopics and read all partition assignments from 
ZK again. Since at this point, we already know all partitions, we could just 
pass in the partition set to getPartitionLeaderAndIsrForTopics() to avoid 
reading each topic again.



> Improve controller failover latency
> ---
>
> Key: KAFKA-923
> URL: https://issues.apache.org/jira/browse/KAFKA-923
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: kafka-0.8
> Attachments: kafka-923-v1.patch
>
>
> During controller failover, we do the following things -
> 1. Increment controller epoch 
> 2. Initialize controller context
> 3. Initialize replica state machine
> 4. Initialize partition state machine
> During step 2 above, we read the information of all topics and partitions, 
> the replica assignments and leadership information. During step 3 and 4, we 
> re-read some of this information from zookeeper. Since the zookeeper reads 
> are proportional to the number of topics and the reads are serial, it is 
> important to optimize this. The zookeeper reads in steps 3 and 4 are not 
> required.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira