Re: git wip incubator-kafka.git
Chris, I plan to submit an infra ticket for the post graduation items. One of the subtasks will be to move our svn repo to a new location (w/o incubator). We could either reuse your ticket and ask for a new repo name or we could close your ticket and create a new one. Which one would you prefer? Thanks, Jun On Tue, Sep 25, 2012 at 8:09 PM, Chris Burroughs wrote: > https://issues.apache.org/jira/browse/INFRA-5111 > > Pleaes check out > https://git-wip-us.apache.org/repos/asf/incubator-kafka.git (currently > read only) and report on any issues. Joe and I were looking at the git > mirror the other day (which this is based on) and didn't notice > anything, so any bugs must be super vigilant. > > (I defer to Joe if he would rather wait until after the rc VOTE to go > read/write, or make 0.7.2 the first git release.) >
[jira] [Commented] (KAFKA-635) Producer error when trying to send not displayed unless in DEBUG logging level
[ https://issues.apache.org/jira/browse/KAFKA-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13505222#comment-13505222 ] Jun Rao commented on KAFKA-635: --- Chris, Thanks for reporting this. Do you really need to turn on debug level logging? The extra logging that you showed is at WARN level. > Producer error when trying to send not displayed unless in DEBUG logging level > -- > > Key: KAFKA-635 > URL: https://issues.apache.org/jira/browse/KAFKA-635 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 > Environment: Java client >Reporter: Chris Curtin >Priority: Minor > > When trying to figure out how to connection with 0.8.0 Producer was only > seeing exceptions: > Exception in thread "main" kafka.common.FailedToSendMessageException: Failed > to send messages after 3 tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:70) > at kafka.producer.Producer.send(Producer.scala:75) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at com.spop.kafka.playproducer.TestProducer.main(TestProducer.java:40) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) > Changing log4j level to DEBUG showed the actual error: > 878 [main] WARN kafka.producer.async.DefaultEventHandler - failed to send > to broker 3 with data Map([test1,0] -> > ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc > = 1906312613, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=22 > cap=22]),0), )) > java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36) > at kafka.producer.SyncProducer.send(SyncProducer.scala:94) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:221) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:87) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:81) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:81) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57) > at kafka.producer.Producer.send(Producer.scala:75) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at com.spop.kafka.playproducer.TestProducer.main(TestProducer.java:40) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) > Submitted per Jay's request in a mailing list thread. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-633) AdminTest.testShutdownBroker fails
Jun Rao created KAFKA-633: - Summary: AdminTest.testShutdownBroker fails Key: KAFKA-633 URL: https://issues.apache.org/jira/browse/KAFKA-633 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao 0m[ [31merror [0m] [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) [0m junit.framework.AssertionFailedError: expected:<2> but was:<3> at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:277) at junit.framework.Assert.assertEquals(Assert.java:64) at junit.framework.Assert.assertEquals(Assert.java:195) at junit.framework.Assert.assertEquals(Assert.java:201) at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309) at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40) at sbt.TestRunner.run(TestFramework.scala:53) at sbt.TestRunner.runTest$1(TestFramework.scala:67) at sbt.TestRunner.run(TestFramework.scala:76) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205) at sbt.NamedTestTask.run(TestFramework.scala:92) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193) at sbt.TaskManager$Task.invoke(TaskManager.scala:62) at sbt.impl.RunTask.doRun$1(RunTask.scala:77) at sbt.impl.RunTask.runTask(RunTask.scala:85) at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131) at sbt.Control$.trapUnit(Control.scala:19) at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-632) ProducerRequest should take ByteBufferMessageSet instead of MessageSet
Jun Rao created KAFKA-632: - Summary: ProducerRequest should take ByteBufferMessageSet instead of MessageSet Key: KAFKA-632 URL: https://issues.apache.org/jira/browse/KAFKA-632 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8 Reporter: Jun Rao Currently, ProducerRequest takes MessageSet in the constructor and casts it to ByteBufferMessageSet. It should take ByteBufferMessageSet directly. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
trunk and branches update
Hi, Since we are getting close to the 0.8 release, I have made the following svn changes in our repository. moved 0.7 branch to 0.7.0 moved trunk to 0.7 copied 0.8 to trunk Now trunk is available for post 0.8 development. 0.8 changes will still be checked into the 0.8 branch and we will merge the 0.8 changes to trunk periodically. Thanks, Jun
Kafka 0.8 trial version
Hi, Everyone, Thanks to people who contributed to the project, we have made significant progress in Kafka replication. Now, we'd like people to give it a try. Please check out revision 1411070 from https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8 and follow the following wiki. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start The code is expected to be stable and we are still looking into some performance issues with hundreds of topic/partitions. If you see any issues, please let us know. Thanks, Jun
[jira] [Closed] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-612. - > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-612.patch, kafka-612-v2.patch > > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-612: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Thanks for the review. Changed the method name and committed to 0.8. > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-612.patch, kafka-612-v2.patch > > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-612: -- Attachment: kafka-612-v2.patch Thanks for the review. Attach patch v2. 1.1 This is to handle the case that a partition is removed from the fetchMap while a fetch request is issued to the broker (since we don't hold the lock when making fetch requests). When this happens, we just need to ignore this partition. 1.2 Good suggestion. Fixed. 2. That info message is really used to track how long it takes a broker to complete the leaderAndIsr request. From the broker's perspective, once the leader is set in a partition, the partition can serve read/write requests. 3.1 Agreed that the naming is confusing. Changed it to partitionMap. 3.2 Done. > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-612.patch, kafka-612-v2.patch > > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures
[ https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499941#comment-13499941 ] Jun Rao commented on KAFKA-532: --- 50. My feeling is that request level error code conveys the same meaning that every partition fails with the same error code and my preference is to keep all response format consistent. However, if you prefer, it's ok to check in the patch as it is and revisit it when we finalize the wire format. So, +1 from me on the patch. > Multiple controllers can co-exist during soft failures > -- > > Key: KAFKA-532 > URL: https://issues.apache.org/jira/browse/KAFKA-532 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: bugs > Attachments: kafka-532-v1.patch, kafka-532-v2.patch, > kafka-532-v3.patch, kafka-532-v4.patch, kafka-532-v5.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > If the current controller experiences an intermittent soft failure (GC pause) > in the middle of leader election or partition reassignment, a new controller > might get elected and start communicating new state change decisions to the > brokers. After recovering from the soft failure, the old controller might > continue sending some stale state change decisions to the brokers, resulting > in unexpected failures. We need to introduce a controller generation id that > increments with controller election. The brokers should reject any state > change requests by a controller with an older generation id. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures
[ https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499860#comment-13499860 ] Jun Rao commented on KAFKA-532: --- Thanks for patch v5. Looks good. Just a couple of minor comments. 50. LeaderAndIsrResponse and StopReplicaResponse: Currently, for all types of response, we have moved to the model that there is no global error code at the response level. Instead, if a request can't be processed for any partition, we just set the same error code for each partition in the response. This achieves the same effect, but makes the handling of the response easier. One just has to deal with the error code per partition. 51. Are the changes in test/resources/log4j.properties intended? > Multiple controllers can co-exist during soft failures > -- > > Key: KAFKA-532 > URL: https://issues.apache.org/jira/browse/KAFKA-532 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: bugs > Attachments: kafka-532-v1.patch, kafka-532-v2.patch, > kafka-532-v3.patch, kafka-532-v4.patch, kafka-532-v5.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > If the current controller experiences an intermittent soft failure (GC pause) > in the middle of leader election or partition reassignment, a new controller > might get elected and start communicating new state change decisions to the > brokers. After recovering from the soft failure, the old controller might > continue sending some stale state change decisions to the brokers, resulting > in unexpected failures. We need to introduce a controller generation id that > increments with controller election. The brokers should reject any state > change requests by a controller with an older generation id. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer
[ https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-613. --- Resolution: Fixed Fix Version/s: 0.8 Thanks for patch v2. +1. Committed to 0.8. > MigrationTool should disable shallow iteration in the 0.7 consumer > -- > > Key: KAFKA-613 > URL: https://issues.apache.org/jira/browse/KAFKA-613 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Fix For: 0.8 > > Attachments: kafka_613_v1.patch, kafka_613_v2.patch > > > If shallow iteration is enabled, we should override it and log a warning. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer
[ https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-613. - > MigrationTool should disable shallow iteration in the 0.7 consumer > -- > > Key: KAFKA-613 > URL: https://issues.apache.org/jira/browse/KAFKA-613 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Fix For: 0.8 > > Attachments: kafka_613_v1.patch, kafka_613_v2.patch > > > If shallow iteration is enabled, we should override it and log a warning. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-618) Deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure
[ https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499309#comment-13499309 ] Jun Rao commented on KAFKA-618: --- Thanks for the patch. +1 > Deadlock between leader-finder-thread and consumer-fetcher-thread during > broker failure > --- > > Key: KAFKA-618 > URL: https://issues.apache.org/jira/browse/KAFKA-618 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy >Priority: Blocker > Fix For: 0.8 > > Attachments: KAFKA-618-v1.patch > > > This causes the test failure reported in KAFKA-607. This affects high-level > consumers - if they hit the deadlock then they would get wedged (or at least > until the consumer timeout). > Here is the threaddump output that shows the issue: > Found one Java-level deadlock: > = > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > waiting for ownable synchronizer 0x7f2283ad, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread" > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > waiting to lock monitor 0x7f2288297190 (object 0x7f2283ab01d0, a > java.lang.Object), > which is held by > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1" > Java stack information for the threads listed above: > === > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x7f2283ad> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at > kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > - locked <0x7f2283ab01d0> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > at > kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142) > - waiting to lock <0x7f2283ab01d0> (a java.lang.Object) > at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49) > - locked <0x7f2283ab0338> (a java.lang.Object) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) >
[jira] [Resolved] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-614. --- Resolution: Fixed Fix Version/s: 0.8 Thanks for the patch. +1 and committed to 0.8. > DumpLogSegment offset verification is incorrect for compressed messages > --- > > Key: KAFKA-614 > URL: https://issues.apache.org/jira/browse/KAFKA-614 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie > Fix For: 0.8 > > Attachments: kafka_614_v1.patch > > > During verification, DumpLogSegment tries to make sure that offsets are > consecutive. However, this won't be true for compressed messages since > FileMessageSet only does shallow iteration. The simplest fix is to skip the > verification for compressed messages. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-614. - > DumpLogSegment offset verification is incorrect for compressed messages > --- > > Key: KAFKA-614 > URL: https://issues.apache.org/jira/browse/KAFKA-614 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie > Fix For: 0.8 > > Attachments: kafka_614_v1.patch > > > During verification, DumpLogSegment tries to make sure that offsets are > consecutive. However, this won't be true for compressed messages since > FileMessageSet only does shallow iteration. The simplest fix is to skip the > verification for compressed messages. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer
[ https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499269#comment-13499269 ] Jun Rao commented on KAFKA-613: --- Thanks for patch v1. It would be good to also log a warning that "shallow iteration is not supported" if shallowiteration is set to true in the 0.7 consumer property file. > MigrationTool should disable shallow iteration in the 0.7 consumer > -- > > Key: KAFKA-613 > URL: https://issues.apache.org/jira/browse/KAFKA-613 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Attachments: kafka_613_v1.patch > > > If shallow iteration is enabled, we should override it and log a warning. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-620) UnknownHostError looking for a ZK node crashes the broker
[ https://issues.apache.org/jira/browse/KAFKA-620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499043#comment-13499043 ] Jun Rao commented on KAFKA-620: --- Do you have other ZK hosts in your connection string? > UnknownHostError looking for a ZK node crashes the broker > - > > Key: KAFKA-620 > URL: https://issues.apache.org/jira/browse/KAFKA-620 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.7.1 > Environment: linux. Amazon's AMI >Reporter: Matthew Rathbone > > If you totally kill a zookeeper node so that it's hostname no longer resolves > to anything, the broker will die with a java.net.UnknownHostException. > You will then be unable to restart the broker until the unknown host(s) is > removed from the server.properties. > We ran into this issue while testing our resilience to widespread AWS > outages, if you can point me to the right place, I could have a go at fixing > it? Unfortunately, I suspect the issue might be in the non-standard Zookeeper > library that kafka uses. > Here's the stack trace: > org.I0Itec.zkclient.exception.ZkException: Unable to connect to [list of > zookeepers] > at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:66) > at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84) > at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) > at kafka.log.LogManager.(LogManager.scala:87) > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > at kafka.Kafka$.main(Kafka.scala:50) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.net.UnknownHostException: zk-101 > at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:850) > at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1201) > at java.net.InetAddress.getAllByName0(InetAddress.java:1154) > at java.net.InetAddress.getAllByName(InetAddress.java:1084) > at java.net.InetAddress.getAllByName(InetAddress.java:1020) > at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:387) > at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332) > at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383) > at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64) > ... 9 more -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-605) System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments
[ https://issues.apache.org/jira/browse/KAFKA-605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-605: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to 0.8. > System Test - Log Retention Cases should wait longer before getting the > common starting offset in replica log segments > -- > > Key: KAFKA-605 > URL: https://issues.apache.org/jira/browse/KAFKA-605 > Project: Kafka > Issue Type: Task >Reporter: John Fung > Fix For: 0.8 > > Attachments: kafka-605-v1.patch, kafka-605-v2.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-605) System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments
[ https://issues.apache.org/jira/browse/KAFKA-605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-605. - > System Test - Log Retention Cases should wait longer before getting the > common starting offset in replica log segments > -- > > Key: KAFKA-605 > URL: https://issues.apache.org/jira/browse/KAFKA-605 > Project: Kafka > Issue Type: Task >Reporter: John Fung > Fix For: 0.8 > > Attachments: kafka-605-v1.patch, kafka-605-v2.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-618) Deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure
[ https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13498555#comment-13498555 ] Jun Rao commented on KAFKA-618: --- This is a very good finding. The following is one way of breaking the deadlock. In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass partitionMap (which is immutable) to each newly created ConsumerFetcherThread. This way, ConsumerFetcherThread.processPartitionData() and ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on ConsumerFetcherManager any more. If we do that, we can improve ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of noLeaderPartitionSet and partitionMap can be done together before calling closeAllFetchers(). Before, we have to clear partitionMap last because before all fetchers are stopped, the processing of the fetch request still needs to read partitionMap and expects it to be non-null. > Deadlock between leader-finder-thread and consumer-fetcher-thread during > broker failure > --- > > Key: KAFKA-618 > URL: https://issues.apache.org/jira/browse/KAFKA-618 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy >Priority: Blocker > Fix For: 0.8 > > > This causes the test failure reported in KAFKA-607. This affects high-level > consumers - if they hit the deadlock then they would get wedged (or at least > until the consumer timeout). > Here is the threaddump output that shows the issue: > Found one Java-level deadlock: > = > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > waiting for ownable synchronizer 0x7f2283ad, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread" > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > waiting to lock monitor 0x7f2288297190 (object 0x7f2283ab01d0, a > java.lang.Object), > which is held by > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1" > Java stack information for the threads listed above: > === > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x7f2283ad> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at > kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > - locked <0x7f2283ab01d0> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > at > kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142) > - waiting to lock <0x7f2283ab01d0> (a java.lang.Object) > at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49) > - locked <0x7f2283ab0338> (a java.lang.Object) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81) > at > kafka.consumer
[jira] [Created] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages
Jun Rao created KAFKA-614: - Summary: DumpLogSegment offset verification is incorrect for compressed messages Key: KAFKA-614 URL: https://issues.apache.org/jira/browse/KAFKA-614 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao During verification, DumpLogSegment tries to make sure that offsets are consecutive. However, this won't be true for compressed messages since FileMessageSet only does shallow iteration. The simplest fix is to skip the verification for compressed messages. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer
Jun Rao created KAFKA-613: - Summary: MigrationTool should disable shallow iteration in the 0.7 consumer Key: KAFKA-613 URL: https://issues.apache.org/jira/browse/KAFKA-613 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao If shallow iteration is enabled, we should override it and log a warning. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-544) Retain key in producer and expose it in the consumer
[ https://issues.apache.org/jira/browse/KAFKA-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13497755#comment-13497755 ] Jun Rao commented on KAFKA-544: --- +1 on patch v5. For 30, could you add the same constructor comment for Partitioner too? > Retain key in producer and expose it in the consumer > > > Key: KAFKA-544 > URL: https://issues.apache.org/jira/browse/KAFKA-544 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Jay Kreps >Priority: Blocker > Labels: bugs > Attachments: KAFKA-544-v1.patch, KAFKA-544-v2.patch, > KAFKA-544-v3.patch, KAFKA-544-v4.patch, KAFKA-544-v5.patch > > > KAFKA-506 added support for retaining a key in the messages, however this > field is not yet set by the producer. > The proposal for doing this is to change the producer api to change > ProducerData to allow only a single key/value pair so it has a one-to-one > mapping to Message. That is change from > ProducerData(topic: String, key: K, data: Seq[V]) > to > ProducerData(topic: String, key: K, data: V) > The key itself needs to be encoded. There are several ways this could be > handled. A few of the options: > 1. Change the Encoder and Decoder to be MessageEncoder and MessageDecoder and > have them take both a key and value. > 2. Another option is to change the type of the encoder/decoder to not refer > to Message so it could be used for both the key and value. > I favor the second option but am open to feedback. > One concern with our current approach to serialization as well as both of > these proposals is that they are inefficient. We go from > Object=>byte[]=>Message=>MessageSet with a copy at each step. In the case of > compression there are a bunch of intermediate steps. We could theoretically > clean this up by instead having an interface for the encoder that was > something like >Encoder.writeTo(buffer: ByteBuffer, object: AnyRef) > and >Decoder.readFrom(buffer:ByteBuffer): AnyRef > However there are two problems with this. The first is that we don't actually > know the size of the data until it is serialized so we can't really allocate > the bytebuffer properly and might need to resize it. The second is that in > the case of compression there is a whole other path to consider. Originally I > thought maybe it would be good to try to fix this, but now I think it should > be out-of-scope and we should revisit the efficiency issue in a future > release in conjunction with our internal handling of compression. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-544) Retain key in producer and expose it in the consumer
[ https://issues.apache.org/jira/browse/KAFKA-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13497680#comment-13497680 ] Jun Rao commented on KAFKA-544: --- Thanks for patch v3. Looks good overall. Some minor comments: 30. Encoder: It seems that we require the constructor of Encoder and Partitioner to take a VerifiableProperty. It would be good if we can add a comment on that in the trait. 31. ConsumerConnector: Can we have a version of create,essageStreamsByFilter without the decoders? 32. ConsumerFetcherManager: no change is needed. 33. BlockingChannel: logger.debug() should be just debug(). 34. ChecksumMessageFormatter: We probably can't remove it since it may be used in our tests. > Retain key in producer and expose it in the consumer > > > Key: KAFKA-544 > URL: https://issues.apache.org/jira/browse/KAFKA-544 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Jay Kreps >Priority: Blocker > Labels: bugs > Attachments: KAFKA-544-v1.patch, KAFKA-544-v2.patch, > KAFKA-544-v3.patch, KAFKA-544-v4.patch > > > KAFKA-506 added support for retaining a key in the messages, however this > field is not yet set by the producer. > The proposal for doing this is to change the producer api to change > ProducerData to allow only a single key/value pair so it has a one-to-one > mapping to Message. That is change from > ProducerData(topic: String, key: K, data: Seq[V]) > to > ProducerData(topic: String, key: K, data: V) > The key itself needs to be encoded. There are several ways this could be > handled. A few of the options: > 1. Change the Encoder and Decoder to be MessageEncoder and MessageDecoder and > have them take both a key and value. > 2. Another option is to change the type of the encoder/decoder to not refer > to Message so it could be used for both the key and value. > I favor the second option but am open to feedback. > One concern with our current approach to serialization as well as both of > these proposals is that they are inefficient. We go from > Object=>byte[]=>Message=>MessageSet with a copy at each step. In the case of > compression there are a bunch of intermediate steps. We could theoretically > clean this up by instead having an interface for the encoder that was > something like >Encoder.writeTo(buffer: ByteBuffer, object: AnyRef) > and >Decoder.readFrom(buffer:ByteBuffer): AnyRef > However there are two problems with this. The first is that we don't actually > know the size of the data until it is serialized so we can't really allocate > the bytebuffer properly and might need to resize it. The second is that in > the case of compression there is a whole other path to consider. Originally I > thought maybe it would be good to try to fix this, but now I think it should > be out-of-scope and we should revisit the efficiency issue in a future > release in conjunction with our internal handling of compression. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-612: -- Status: Patch Available (was: Open) > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-612.patch > > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-612: -- Attachment: kafka-612.patch Attach a patch. It introduces a separate method for shutting down empty fetcher threads and that method will be called at the end of processing a leaderAndIsr request. Because of this change, a fetcher thread could have an empty fetch map. To avoid sending empty fetch requests to the broker, the patch uses a condition to coordinate the fetch. Also fixed a bug when removing items from a scala map (can't do the removal while iterating the map since the behaviour is not deterministic). > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-612.patch > > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-612) move shutting down of fetcher thread out of critical path
Jun Rao created KAFKA-612: - Summary: move shutting down of fetcher thread out of critical path Key: KAFKA-612 URL: https://issues.apache.org/jira/browse/KAFKA-612 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Shutting down a fetch thread seems to take more than 200ms since we need to interrupt the thread. Currently, we shutdown fetcher threads while processing a leaderAndIsr request. This can delay some of the partitions to become a leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-612) move shutting down of fetcher thread out of critical path
[ https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-612: - Assignee: Jun Rao > move shutting down of fetcher thread out of critical path > - > > Key: KAFKA-612 > URL: https://issues.apache.org/jira/browse/KAFKA-612 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > > Shutting down a fetch thread seems to take more than 200ms since we need to > interrupt the thread. Currently, we shutdown fetcher threads while processing > a leaderAndIsr request. This can delay some of the partitions to become a > leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-585) Remove custom metrics jar and replace with latest from metrics HEAD
[ https://issues.apache.org/jira/browse/KAFKA-585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13495462#comment-13495462 ] Jun Rao commented on KAFKA-585: --- Thanks for patch v1. +1. Just make sure that the basic system tests still work before checking in. > Remove custom metrics jar and replace with latest from metrics HEAD > --- > > Key: KAFKA-585 > URL: https://issues.apache.org/jira/browse/KAFKA-585 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Attachments: KAFKA-585-v1.patch, > metrics-annotation-3.0.0-c0c8be71.jar, metrics-core-3.0.0-c0c8be71.jar > > > This is for at least until metrics 3.x is mavenized. > Also: > The KafkaCSVMetricsReporter object may be better named as > KafkaMetricsReporter since startCSVMetricsReporter > potentially starts up other (non-CSV) reporters (if any) as well - in which > case KafkaMetricsReporter.scala would be a > better place for it. Or, you can just filter out non-CSV reporters. > Also, the top-level/config/server.properties need not enable the csv > reporter. I thought the system test replication > suite's server.properties would need to be patched, but it isn't. Should look > into whether the test suite picks up the top-level > config as a template. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-606) System Test Transient Failure (case 0302 GC Pause) - Log segments mismatched across replicas
[ https://issues.apache.org/jira/browse/KAFKA-606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13495387#comment-13495387 ] Jun Rao commented on KAFKA-606: --- There seems to be 2 issues. (1) Since we only do shallow iteration in DumpLogSegments, the offsets may not be consecutive when messages are compressed. (2) There is a bug that we cached the offsets as Int. It should be long. > System Test Transient Failure (case 0302 GC Pause) - Log segments mismatched > across replicas > > > Key: KAFKA-606 > URL: https://issues.apache.org/jira/browse/KAFKA-606 > Project: Kafka > Issue Type: Bug >Reporter: John Fung > Attachments: testcase_0302_data_and_log4j.tar.gz > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-591) Add test cases to test log size retention and more
[ https://issues.apache.org/jira/browse/KAFKA-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-591. - > Add test cases to test log size retention and more > -- > > Key: KAFKA-591 > URL: https://issues.apache.org/jira/browse/KAFKA-591 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: John Fung > Fix For: 0.8 > > Attachments: kafka-591-v1.patch, kafka-591-v2.patch > > > Add test cases to test the followings: > 1. Log Size Retention > 2. Replica Factor < no. of brokers in a cluster > 3. Multiple instances of Migration Tool > 4. Multiple instances of Mirror Maker > 5. Set "log.index.interval.bytes" to be slightly smaller than message size to > force the indexing to be performed for each message -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-591) Add test cases to test log size retention and more
[ https://issues.apache.org/jira/browse/KAFKA-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-591: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Thanks for patch v2. +1 Committed to 0.8 > Add test cases to test log size retention and more > -- > > Key: KAFKA-591 > URL: https://issues.apache.org/jira/browse/KAFKA-591 > Project: Kafka > Issue Type: Bug >Reporter: John Fung >Assignee: John Fung > Fix For: 0.8 > > Attachments: kafka-591-v1.patch, kafka-591-v2.patch > > > Add test cases to test the followings: > 1. Log Size Retention > 2. Replica Factor < no. of brokers in a cluster > 3. Multiple instances of Migration Tool > 4. Multiple instances of Mirror Maker > 5. Set "log.index.interval.bytes" to be slightly smaller than message size to > force the indexing to be performed for each message -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-604) Add missing metrics in 0.8
Jun Rao created KAFKA-604: - Summary: Add missing metrics in 0.8 Key: KAFKA-604 URL: https://issues.apache.org/jira/browse/KAFKA-604 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao It would be good if we add the following metrics: Producer: droppedMessageRate per topic ReplicaManager: partition count on the broker FileMessageSet: logFlushTimer per log (i.e., partition). Also, logFlushTime should probably be moved to LogSegment since the flush now includes index flush time. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-546) Fix commit() in zk consumer for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-546. --- Resolution: Fixed Fix Version/s: 0.8 Thanks for patch v5. +1. Committed to 0.8. > Fix commit() in zk consumer for compressed messages > --- > > Key: KAFKA-546 > URL: https://issues.apache.org/jira/browse/KAFKA-546 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Swapnil Ghike > Fix For: 0.8 > > Attachments: kafka-546-v1.patch, kafka-546-v2.patch, > kafka-546-v3.patch, kafka-546-v4.patch, kafka-546-v5.patch > > > In 0.7.x and earlier versions offsets were assigned by the byte location in > the file. Because it wasn't possible to directly decompress from the middle > of a compressed block, messages inside a compressed message set effectively > had no offset. As a result the offset given to the consumer was always the > offset of the wrapper message set. > In 0.8 after the logical offsets patch messages in a compressed set do have > offsets. However the server still needs to fetch from the beginning of the > compressed messageset (otherwise it can't be decompressed). As a result a > commit() which occurs in the middle of a message set will still result in > some duplicates. > This can be fixed in the ConsumerIterator by discarding messages smaller than > the fetch offset rather than giving them to the consumer. This will make > commit work correctly in the presence of compressed messages (finally). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-546) Fix commit() in zk consumer for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-546. - > Fix commit() in zk consumer for compressed messages > --- > > Key: KAFKA-546 > URL: https://issues.apache.org/jira/browse/KAFKA-546 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Swapnil Ghike > Fix For: 0.8 > > Attachments: kafka-546-v1.patch, kafka-546-v2.patch, > kafka-546-v3.patch, kafka-546-v4.patch, kafka-546-v5.patch > > > In 0.7.x and earlier versions offsets were assigned by the byte location in > the file. Because it wasn't possible to directly decompress from the middle > of a compressed block, messages inside a compressed message set effectively > had no offset. As a result the offset given to the consumer was always the > offset of the wrapper message set. > In 0.8 after the logical offsets patch messages in a compressed set do have > offsets. However the server still needs to fetch from the beginning of the > compressed messageset (otherwise it can't be decompressed). As a result a > commit() which occurs in the middle of a message set will still result in > some duplicates. > This can be fixed in the ConsumerIterator by discarding messages smaller than > the fetch offset rather than giving them to the consumer. This will make > commit work correctly in the presence of compressed messages (finally). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492975#comment-13492975 ] Jun Rao commented on KAFKA-574: --- Thanks for patch v4. Patch looks good and system tests pass for me. Can't see the failures that you and Neha are seeing. So, +1 from me. > KafkaController unnecessarily reads leaderAndIsr info from ZK > - > > Key: KAFKA-574 > URL: https://issues.apache.org/jira/browse/KAFKA-574 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Prashanth Menon >Priority: Blocker > Labels: bugs > Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch, > KAFKA-574-v3.patch, KAFKA-574-v4.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is > unnecessary since in onBrokerFailure(), we will make leader and isr change > anyway so there is no need to first read that information from ZK. Latency is > critical in onBrokerFailure() since it determines how quickly a leader can be > made online. > Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() > unnecessarily. In this case, the controller does not change the leader or the > isr. It just needs to send the current leader and the isr info to the newly > started broker. We already cache leader in the controller. Isr in theory > could change any time by the leader. So, reading from ZK doesn't guarantee > that we can get the latest isr anyway. Instead, we just need to get the isr > last selected by the controller (which can be cached together with the leader > in the controller). If the leader epoc in a broker is at or larger than the > epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, > the leader and the isr selected by the controller should be used. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492552#comment-13492552 ] Jun Rao commented on KAFKA-546: --- Thanks for patch v3. A couple of more comments: 30. ConsumerIterator.next(): In the following code, to be safe, we need to check if localCurrent hasNext in the while loop. // reject the messages that have already been consumed while (item.offset < currentTopicInfo.getConsumeOffset) { item = localCurrent.next() 31. ConsumerIteratorTest: Not sure if the test really does what it intends to. To simulate reading from the middle of a compressed messageset, we need to put in a consume offset larger than 0 in PartitionTopicInfo, right? > Fix commit() in zk consumer for compressed messages > --- > > Key: KAFKA-546 > URL: https://issues.apache.org/jira/browse/KAFKA-546 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Swapnil Ghike > Attachments: kafka-546-v1.patch, kafka-546-v2.patch, > kafka-546-v3.patch > > > In 0.7.x and earlier versions offsets were assigned by the byte location in > the file. Because it wasn't possible to directly decompress from the middle > of a compressed block, messages inside a compressed message set effectively > had no offset. As a result the offset given to the consumer was always the > offset of the wrapper message set. > In 0.8 after the logical offsets patch messages in a compressed set do have > offsets. However the server still needs to fetch from the beginning of the > compressed messageset (otherwise it can't be decompressed). As a result a > commit() which occurs in the middle of a message set will still result in > some duplicates. > This can be fixed in the ConsumerIterator by discarding messages smaller than > the fetch offset rather than giving them to the consumer. This will make > commit work correctly in the presence of compressed messages (finally). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13491642#comment-13491642 ] Jun Rao commented on KAFKA-574: --- Thanks for patch v3. Just one more comment: 30. KafkaController.removeReplicaFromIsr(): Shouldn't we only update newLeaderAndIsr in the cache if updateSucceeded is true? 20. Ignore my comment on leaderAndIsrIsEmpty. It is fine. I ran system tests with your patch and they seem to pass. Did you build the Kafka jar before running the test? > KafkaController unnecessarily reads leaderAndIsr info from ZK > - > > Key: KAFKA-574 > URL: https://issues.apache.org/jira/browse/KAFKA-574 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Prashanth Menon >Priority: Blocker > Labels: bugs > Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch, > KAFKA-574-v3.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is > unnecessary since in onBrokerFailure(), we will make leader and isr change > anyway so there is no need to first read that information from ZK. Latency is > critical in onBrokerFailure() since it determines how quickly a leader can be > made online. > Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() > unnecessarily. In this case, the controller does not change the leader or the > isr. It just needs to send the current leader and the isr info to the newly > started broker. We already cache leader in the controller. Isr in theory > could change any time by the leader. So, reading from ZK doesn't guarantee > that we can get the latest isr anyway. Instead, we just need to get the isr > last selected by the controller (which can be cached together with the leader > in the controller). If the leader epoc in a broker is at or larger than the > epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, > the leader and the isr selected by the controller should be used. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures
[ https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13491566#comment-13491566 ] Jun Rao commented on KAFKA-532: --- Thanks for patch v4. A few more comments: 40. PartitionStateInfo: It seems that we need to send the controllerEpoc associated with this partition. Note that this epoc is different from the controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller that last changed the leader or isr and will be used when broker updates the isr. The latter is the epoc of the controller that sends the request and will be used in ReplicaManager to decide which controller's decision to follow. We will need to change the controllerEpoc passed to makeLeader and makeFollower in ReplicaManager accordingly. 41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be better to only update controllerEpoch when it's truly necessary, i.e., the new controllerEpoch is larger than the cached one (not equal). This is because updating a volatile variable is a bit expensive than updating a local variable since the update has to be exposed to other threads. 42. KafkaController: The approach in the new patch works. There are a few corner cases that we need to cover. 42.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we create the path using the initial epoc version without using conditional update. It is possible for 2 controllers to execute this logic simultaneously and both get the initial epoc version. One solution is to make sure the controller epoc path exists during context initialization. Then we can always use conditional update here. 42.2. ControllerContext: We need to initialize controllerEpoc by reading from ZK. We also need to make sure that we subscribe to the controllerEpoc path first and then read its value from ZK for initialization. 42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK version using the value from ZkUtils.readData. 43. ControllerMovedException is missing in the patch > Multiple controllers can co-exist during soft failures > -- > > Key: KAFKA-532 > URL: https://issues.apache.org/jira/browse/KAFKA-532 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: bugs > Attachments: kafka-532-v1.patch, kafka-532-v2.patch, > kafka-532-v3.patch, kafka-532-v4.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > If the current controller experiences an intermittent soft failure (GC pause) > in the middle of leader election or partition reassignment, a new controller > might get elected and start communicating new state change decisions to the > brokers. After recovering from the soft failure, the old controller might > continue sending some stale state change decisions to the brokers, resulting > in unexpected failures. We need to introduce a controller generation id that > increments with controller election. The brokers should reject any state > change requests by a controller with an older generation id. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-577. --- Resolution: Fixed Thanks for patch v6. +1. Committed to 0.8 with a minor change: using ::= on a list instead of .:+= > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, > kafka_577_v4.diff, kafka_577_v5.diff, kafka_577_v6.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-577. - > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, > kafka_577_v4.diff, kafka_577_v5.diff, kafka_577_v6.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
[ https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-596. --- Resolution: Fixed Fix Version/s: 0.8 Thanks for patch v3. +1. Committed to 0.8. > LogSegment.firstAppendTime not reset after truncate to > -- > > Key: KAFKA-596 > URL: https://issues.apache.org/jira/browse/KAFKA-596 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-596.patch, kafka-596-v2.patch, kafka-596-v3.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently, we don't reset LogSegment.firstAppendTime after the segment is > truncated. What can happen is that we truncate the segment to size 0 and on > next append, a new log segment with the same starting offset is rolled > because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
[ https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-596. - > LogSegment.firstAppendTime not reset after truncate to > -- > > Key: KAFKA-596 > URL: https://issues.apache.org/jira/browse/KAFKA-596 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-596.patch, kafka-596-v2.patch, kafka-596-v3.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently, we don't reset LogSegment.firstAppendTime after the segment is > truncated. What can happen is that we truncate the segment to size 0 and on > next append, a new log segment with the same starting offset is rolled > because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490977#comment-13490977 ] Jun Rao commented on KAFKA-546: --- Thanks for patch v2. Some comments: 20. ConsumerIterator.next(): The following code depends on no gaps in offsets. This is true at this moment, but may not be true in the future when we have a different retention policy. A safer way is to keep iterating the messageSet until we get an offset that reaches or passes ctiConsumeOffset. for (i <- 0L until (ctiConsumeOffset - cdcFetchOffset)) { localCurrent.next() 21. PartitionTopicInfo: In startOffset(), unfortunately, we can't use the shallow iterator. This is because when messages are compressed, the offset of the top level message has the offset of the last message (instead of the first one) in the compressed unit. Also, iterating messages here may not be ideal since it forces us to decompress. An alternative way is to do the logic in ConsumerIterator.next(). Everytime that we get a new chunk of messageset, we keep iterating it until the message offset reaches or passes the consumeroffset. This way, if we are doing shallow iteration, we don't have to decompress messages. 22. ConsumerIteratorTest: 22.1 zkConsumerConnector is not used. 22.2 We probably should set consumerOffset to a value >0 in PartitionTopicInfo. 22.3 Also, could we add a test that covers compressed messageset? > Fix commit() in zk consumer for compressed messages > --- > > Key: KAFKA-546 > URL: https://issues.apache.org/jira/browse/KAFKA-546 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Swapnil Ghike > Attachments: kafka-546-v1.patch, kafka-546-v2.patch > > > In 0.7.x and earlier versions offsets were assigned by the byte location in > the file. Because it wasn't possible to directly decompress from the middle > of a compressed block, messages inside a compressed message set effectively > had no offset. As a result the offset given to the consumer was always the > offset of the wrapper message set. > In 0.8 after the logical offsets patch messages in a compressed set do have > offsets. However the server still needs to fetch from the beginning of the > compressed messageset (otherwise it can't be decompressed). As a result a > commit() which occurs in the middle of a message set will still result in > some duplicates. > This can be fixed in the ConsumerIterator by discarding messages smaller than > the fetch offset rather than giving them to the consumer. This will make > commit work correctly in the presence of compressed messages (finally). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty
[ https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490770#comment-13490770 ] Jun Rao commented on KAFKA-593: --- Thanks for patch v3. Looks good. Some minor comments. 30. Log.rollToOffset(): segmentsView.last.index.file.getName.split("\\.")(0).toLong can just be segmentsView.last.start. 31. Log.loadSegments(): Just to be consistent. Should we use index.maxIndexSize instead of maxIndexSize in the following statement? logSegments.get(logSegments.size() - 1).index.resetSizeTo(maxIndexSize) > Empty log index file created when it shouldn't be empty > --- > > Key: KAFKA-593 > URL: https://issues.apache.org/jira/browse/KAFKA-593 > Project: Kafka > Issue Type: Bug >Reporter: Yang Ye > Attachments: kafka_583_zk_kafka_data.tar.gz, kafka_593_v1.diff, > kafka_593_v2.diff, kafka_593_v3.diff > > > We have met empty index file during system test when it shouldn't be empty. > In this case, there're around 100 messages in each segment, each of size > around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at > least 2 log index entries, but we see empty index file. The kafka and > zookeeper logs are attached > [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/ > [yye@yye-ld test_1-2]$ ls -l > total 84 > -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index > -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log > -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index > -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index > -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index > -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index > -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index > -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log > -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index > -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages
[ https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489481#comment-13489481 ] Jun Rao commented on KAFKA-546: --- Can't see to apply the patch cleanly to 0.8. Could you rebase? $ patch -p0 < ~/Downloads/kafka-546-v1.patch patching file core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala patching file core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Reversed (or previously applied) patch detected! Assume -R? [n] ^C > Fix commit() in zk consumer for compressed messages > --- > > Key: KAFKA-546 > URL: https://issues.apache.org/jira/browse/KAFKA-546 > Project: Kafka > Issue Type: New Feature >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Swapnil Ghike > Attachments: kafka-546-v1.patch > > > In 0.7.x and earlier versions offsets were assigned by the byte location in > the file. Because it wasn't possible to directly decompress from the middle > of a compressed block, messages inside a compressed message set effectively > had no offset. As a result the offset given to the consumer was always the > offset of the wrapper message set. > In 0.8 after the logical offsets patch messages in a compressed set do have > offsets. However the server still needs to fetch from the beginning of the > compressed messageset (otherwise it can't be decompressed). As a result a > commit() which occurs in the middle of a message set will still result in > some duplicates. > This can be fixed in the ConsumerIterator by discarding messages smaller than > the fetch offset rather than giving them to the consumer. This will make > commit work correctly in the presence of compressed messages (finally). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
[ https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489473#comment-13489473 ] Jun Rao commented on KAFKA-596: --- I agree that setting firstAppendTime to None in Logsement.truncateTo() is necessary. However, I don't think this is necessary in Log.maybeRoll() and Log.markedDeletedWhile(). In both cases, we are not changing the log segment. So whoever changed the segment last to make its size 0 (either through truncation or creation) would have set firstAppendTime properly. Note that maybeRoll is called on every log append. We don't want to add unnecessary overhead. > LogSegment.firstAppendTime not reset after truncate to > -- > > Key: KAFKA-596 > URL: https://issues.apache.org/jira/browse/KAFKA-596 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike > Labels: bugs > Attachments: kafka-596.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently, we don't reset LogSegment.firstAppendTime after the segment is > truncated. What can happen is that we truncate the segment to size 0 and on > next append, a new log segment with the same starting offset is rolled > because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: Maintainer system?
I think that's a good idea. It will be good to have at least 2 maintainers per component. I'd encourage more people to review patches. The more patches one reviews, the more familiar he/she is with the components. Thanks, Jun On Thu, Oct 4, 2012 at 1:13 PM, Jay Kreps wrote: > Hey guys, > > The number of developers and code base size for Kafka is getting larger. > > One way to help scale things gracefully would be to have an official idea > of "subsystems" and have official maintainers for those. The duties of a > maintainer would be > 1. Be the final word on API design in that area > 2. Ensure sufficient documentation and test coverage for that subsystem > 3. Review all code changes in that sub-system area > 4. Ensure that patches in that area get applied in a timely fashion > > In particular I think we could do a better job of getting patches in in a > timely manner. > > Here are what I see as logically distinct systems or areas: > >- Producer (java and scala) >- Consumer (java and scala) >- Network layer (kafka.network.*) >- Log (kafka.log.*) >- Replication (controller, fetcher threads, hw mark stuff, etc) >- Kafka API impl (basically just KafkaApi.scala) >- Hadoop stuff >- Perf tools and system tests >- Misc other small things: metrics, utils, etc. > > Obviously many features will cut across these layers, but the idea is that > by having a real owner that is responsible for that area we will get higher > quality. > > I think we are doing this informally already, but making it formal would > help ensure you knew the right people to get input from. I think it > probably wouldn't make sense to start doing this until post-0.8 since we > are in the middle of so many things right now, but I wanted to see what > people thought...? > > -Jay >
[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
[ https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489266#comment-13489266 ] Jun Rao commented on KAFKA-596: --- Thanks for the patch. A couple of comments: 1. Log.maybeRoll(): Are the following lines needed since we are not creating a new segment? if (segment.messageSet.sizeInBytes == 0) segment.firstAppendTime = None 2. Log.markedDeletedWhile(): Is the following line needed since we are not creating a new segment? view(numToDelete - 1).firstAppendTime = None > LogSegment.firstAppendTime not reset after truncate to > -- > > Key: KAFKA-596 > URL: https://issues.apache.org/jira/browse/KAFKA-596 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike > Labels: bugs > Attachments: kafka-596.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently, we don't reset LogSegment.firstAppendTime after the segment is > truncated. What can happen is that we truncate the segment to size 0 and on > next append, a new log segment with the same starting offset is rolled > because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty
[ https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489260#comment-13489260 ] Jun Rao commented on KAFKA-593: --- Thanks for the patch. A couple of comments: 1. Log.rollToOffset(): We just need to verify that the starting offset of the last segment doesn't equal to the new offset. 2. OffsetIndex: When loading the log segment on broker startup, it is possible to have a segment with empty data and empty index. So, we will hit the same issue of rolling a segment with a duplicated name. We probably should extend the index size to max index size in the constructor of OffsetIndex. > Empty log index file created when it shouldn't be empty > --- > > Key: KAFKA-593 > URL: https://issues.apache.org/jira/browse/KAFKA-593 > Project: Kafka > Issue Type: Bug >Reporter: Yang Ye > Attachments: kafka_583_zk_kafka_data.tar.gz, kafka_593_v1.diff > > > We have met empty index file during system test when it shouldn't be empty. > In this case, there're around 100 messages in each segment, each of size > around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at > least 2 log index entries, but we see empty index file. The kafka and > zookeeper logs are attached > [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/ > [yye@yye-ld test_1-2]$ ls -l > total 84 > -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index > -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log > -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index > -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index > -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index > -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index > -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index > -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log > -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index > -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489255#comment-13489255 ] Jun Rao commented on KAFKA-577: --- A couple of more comments for patch v4. 40. LinkedList is supposed to be used for manipulating links directly. We should just use List (immutable) instead. Also, instead of the following, misMatchesSeq = misMatchesSeq.:+((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) we should write misMatchesSeq :+= ((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, > kafka_577_v4.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-491) KafkaRequestHandler needs to handle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-491. - > KafkaRequestHandler needs to handle exceptions > -- > > Key: KAFKA-491 > URL: https://issues.apache.org/jira/browse/KAFKA-491 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-491-v1.diff, kafka-491-v2.diff > > > Currently, if apis.handle() throws an exception (e.g., if the broker receives > an invalid request), KafkaRequestHandler will die. We need to handle > exceptions properly. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-491) KafkaRequestHandler needs to handle exceptions
[ https://issues.apache.org/jira/browse/KAFKA-491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-491. --- Resolution: Fixed Thanks for patch v2. +1. Committed to 0.8. > KafkaRequestHandler needs to handle exceptions > -- > > Key: KAFKA-491 > URL: https://issues.apache.org/jira/browse/KAFKA-491 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-491-v1.diff, kafka-491-v2.diff > > > Currently, if apis.handle() throws an exception (e.g., if the broker receives > an invalid request), KafkaRequestHandler will die. We need to handle > exceptions properly. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-188) Support multiple data directories
[ https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489184#comment-13489184 ] Jun Rao commented on KAFKA-188: --- +1 on patch v8. Thanks, > Support multiple data directories > - > > Key: KAFKA-188 > URL: https://issues.apache.org/jira/browse/KAFKA-188 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps > Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, > KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch, > KAFKA-188-v7.patch, KAFKA-188-v8.patch > > > Currently we allow only a single data directory. This means that a multi-disk > configuration needs to be a RAID array or LVM volume or something like that > to be mounted as a single directory. > For a high-throughput low-reliability configuration this would mean RAID0 > striping. Common wisdom in Hadoop land has it that a JBOD setup that just > mounts each disk as a separate directory and does application-level balancing > over these results in about 30% write-improvement. For example see this claim > here: > http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html > It is not clear to me why this would be the case--it seems the RAID > controller should be able to balance writes as well as the application so it > may depend on the details of the setup. > Nonetheless this would be really easy to implement, all you need to do is add > multiple data directories and balance partition creation over these disks. > One problem this might cause is if a particular topic is much larger than the > others it might unbalance the load across the disks. The partition->disk > assignment policy should probably attempt to evenly spread each topic to > avoid this, rather than just trying keep the number of partitions balanced > between disks. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures
[ https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13488893#comment-13488893 ] Jun Rao commented on KAFKA-532: --- Thanks for patch v3. The overall approach seems to work. Some comments: 30. PartitionStateInfo and LeaderAndIsrRequest: When calculating size in sizeInBytes(), it's more readable if we put each number to be added in a separate line. 31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader should use when updating the leaderAndIsr path. There is probably nothing wrong to use the controllerEpoch in replicaManager. However, it seems to make more sense to use the controllerEpoch in the leaderAndIsr path itself, since this update is actually not made by the controller. 32. ReplicaManager.controllerEpoch: Since this variable can be accessed from different threads, it needs to be a volatile. Also, we only need to update controllerEpoch if the one from the request is larger (but not equal). It probably should be initialized to 0 or -1? 33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder if we really need to start a new broker. Can we just send a stale controller epoc using the controllerChannelManager in the current controller? 34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1. 35. We use int to represent both controller and leader epoc. There is the potential issue if the number wraps. We probably don't need to worry about it now. > Multiple controllers can co-exist during soft failures > -- > > Key: KAFKA-532 > URL: https://issues.apache.org/jira/browse/KAFKA-532 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: bugs > Attachments: kafka-532-v1.patch, kafka-532-v2.patch, > kafka-532-v3.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > If the current controller experiences an intermittent soft failure (GC pause) > in the middle of leader election or partition reassignment, a new controller > might get elected and start communicating new state change decisions to the > brokers. After recovering from the soft failure, the old controller might > continue sending some stale state change decisions to the brokers, resulting > in unexpected failures. We need to introduce a controller generation id that > increments with controller election. The brokers should reject any state > change requests by a controller with an older generation id. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13488810#comment-13488810 ] Jun Rao commented on KAFKA-574: --- Thanks for patch v2. Just one more comment: 20. ReplicaSatemachine.handleStateChange(): In the OfflineReplica state, after the isr is updated, we need to update the leaderAndIsr cache in controller context. Also, is leaderAndIsrIsEmpty better than leaderAndIsrOpt? Could you run the basic system tests and make sure that they pass? /system_test/ $ python –u –B system_test_runner.py 2>&1 | tee system_test_output_`date +%s`.log > KafkaController unnecessarily reads leaderAndIsr info from ZK > - > > Key: KAFKA-574 > URL: https://issues.apache.org/jira/browse/KAFKA-574 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Prashanth Menon >Priority: Blocker > Labels: bugs > Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is > unnecessary since in onBrokerFailure(), we will make leader and isr change > anyway so there is no need to first read that information from ZK. Latency is > critical in onBrokerFailure() since it determines how quickly a leader can be > made online. > Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() > unnecessarily. In this case, the controller does not change the leader or the > isr. It just needs to send the current leader and the isr info to the newly > started broker. We already cache leader in the controller. Isr in theory > could change any time by the leader. So, reading from ZK doesn't guarantee > that we can get the latest isr anyway. Instead, we just need to get the isr > last selected by the controller (which can be cached together with the leader > in the controller). If the leader epoc in a broker is at or larger than the > epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, > the leader and the isr selected by the controller should be used. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-598) decouple fetch size from max message size
Jun Rao created KAFKA-598: - Summary: decouple fetch size from max message size Key: KAFKA-598 URL: https://issues.apache.org/jira/browse/KAFKA-598 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Currently, a consumer has to set fetch size larger than the max message size. This increases the memory footprint on the consumer, especially when a large number of topic/partition is subscribed. By decoupling the fetch size from max message size, we can use a smaller fetch size for normal consumption and when hitting a large message (hopefully rare), we automatically increase fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
[ https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487947#comment-13487947 ] Jun Rao commented on KAFKA-596: --- The fix is to set LogSegment.firstAppendTime to none if we truncate the segment to size 0. However, this brings up the deeper question of how do we prevent segments with identical starting offset from being created during log roll? Maybe, we should add a check in log.roll to guard this. > LogSegment.firstAppendTime not reset after truncate to > -- > > Key: KAFKA-596 > URL: https://issues.apache.org/jira/browse/KAFKA-596 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: bugs > Original Estimate: 24h > Remaining Estimate: 24h > > Currently, we don't reset LogSegment.firstAppendTime after the segment is > truncated. What can happen is that we truncate the segment to size 0 and on > next append, a new log segment with the same starting offset is rolled > because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to
Jun Rao created KAFKA-596: - Summary: LogSegment.firstAppendTime not reset after truncate to Key: KAFKA-596 URL: https://issues.apache.org/jira/browse/KAFKA-596 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Currently, we don't reset LogSegment.firstAppendTime after the segment is truncated. What can happen is that we truncate the segment to size 0 and on next append, a new log segment with the same starting offset is rolled because the time-based rolling is triggered. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-586) system test configs are broken
[ https://issues.apache.org/jira/browse/KAFKA-586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-586: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Thanks for the patch. +1. Committed to 0.8. > system test configs are broken > -- > > Key: KAFKA-586 > URL: https://issues.apache.org/jira/browse/KAFKA-586 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: John Fung >Priority: Critical > Labels: replication-testing > Fix For: 0.8 > > Attachments: kafka-586-v1.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > system test suite has a set of default config values that are picked up from > the testsuite/config directory. One can override the value of a config in the > testcase_properties.json file. This is great, but the assumption is that the > config property that is being overridden should also present in the > testsuite/config/*.properties file. > Currently, there are a number of properties in KafkaConfig that are not in > the testsuite/config/*.properties file. So the tests might intend to override > some properties, but that will be ignored. > Let's either add all the configs in the testsuite/config/*.properties file or > remove this depedency and override the property specified in > testcase_properties.json. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-586) system test configs are broken
[ https://issues.apache.org/jira/browse/KAFKA-586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-586. - > system test configs are broken > -- > > Key: KAFKA-586 > URL: https://issues.apache.org/jira/browse/KAFKA-586 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: John Fung >Priority: Critical > Labels: replication-testing > Fix For: 0.8 > > Attachments: kafka-586-v1.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > system test suite has a set of default config values that are picked up from > the testsuite/config directory. One can override the value of a config in the > testcase_properties.json file. This is great, but the assumption is that the > config property that is being overridden should also present in the > testsuite/config/*.properties file. > Currently, there are a number of properties in KafkaConfig that are not in > the testsuite/config/*.properties file. So the tests might intend to override > some properties, but that will be ignored. > Let's either add all the configs in the testsuite/config/*.properties file or > remove this depedency and override the property specified in > testcase_properties.json. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-594) Update System Test due to new argument "--sync" in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-594. - > Update System Test due to new argument "--sync" in ProducerPerformance > -- > > Key: KAFKA-594 > URL: https://issues.apache.org/jira/browse/KAFKA-594 > Project: Kafka > Issue Type: Task >Reporter: John Fung >Assignee: John Fung > Fix For: 0.8 > > Attachments: kafka-594-v1.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-594) Update System Test due to new argument "--sync" in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-594: -- Resolution: Fixed Fix Version/s: 0.8 Assignee: John Fung Status: Resolved (was: Patch Available) Thanks for the patch. Committed to 0.8. > Update System Test due to new argument "--sync" in ProducerPerformance > -- > > Key: KAFKA-594 > URL: https://issues.apache.org/jira/browse/KAFKA-594 > Project: Kafka > Issue Type: Task >Reporter: John Fung >Assignee: John Fung > Fix For: 0.8 > > Attachments: kafka-594-v1.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty
[ https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487841#comment-13487841 ] Jun Rao commented on KAFKA-593: --- Here is the issue. We rolled a new segment in the follower. The follower in one fetch gets 10k bytes of data and appends to its log. This won't add any index entry since it's the very first append to this segment. After the append, the log rolled since the max segment size is reached. This leaves an empty index. Technically, the logic is still correct. It does mean that index entries may not be generated as frequently as one expects, depending on the fetch size used in the follower fetcher thread and how far behind a follower is. This may impact consumer performance a bit. > Empty log index file created when it shouldn't be empty > --- > > Key: KAFKA-593 > URL: https://issues.apache.org/jira/browse/KAFKA-593 > Project: Kafka > Issue Type: Bug >Reporter: Yang Ye > Attachments: kafka_583_zk_kafka_data.tar.gz > > > We have met empty index file during system test when it shouldn't be empty. > In this case, there're around 100 messages in each segment, each of size > around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at > least 2 log index entries, but we see empty index file. The kafka and > zookeeper logs are attached > [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/ > [yye@yye-ld test_1-2]$ ls -l > total 84 > -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index > -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log > -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index > -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index > -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index > -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index > -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index > -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log > -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index > -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-188) Support multiple data directories
[ https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487307#comment-13487307 ] Jun Rao commented on KAFKA-188: --- Now, some unit tests fail with exceptions like the following. Most of them seem to be transient, but they show up more frequently now. [0m[[31merror[0m] [0mTest Failed: testCleanShutdown(kafka.server.ServerShutdownTest)[0m kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-246675. A Kafka instance in another process or thread is using this directory. > Support multiple data directories > - > > Key: KAFKA-188 > URL: https://issues.apache.org/jira/browse/KAFKA-188 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps > Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, > KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch, KAFKA-188-v7.patch > > > Currently we allow only a single data directory. This means that a multi-disk > configuration needs to be a RAID array or LVM volume or something like that > to be mounted as a single directory. > For a high-throughput low-reliability configuration this would mean RAID0 > striping. Common wisdom in Hadoop land has it that a JBOD setup that just > mounts each disk as a separate directory and does application-level balancing > over these results in about 30% write-improvement. For example see this claim > here: > http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html > It is not clear to me why this would be the case--it seems the RAID > controller should be able to balance writes as well as the application so it > may depend on the details of the setup. > Nonetheless this would be really easy to implement, all you need to do is add > multiple data directories and balance partition creation over these disks. > One problem this might cause is if a particular topic is much larger than the > others it might unbalance the load across the disks. The partition->disk > assignment policy should probably attempt to evenly spread each topic to > avoid this, rather than just trying keep the number of partitions balanced > between disks. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty
[ https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486990#comment-13486990 ] Jun Rao commented on KAFKA-593: --- Producer used sync mode. So, there is 1 message per batch and each segment has about 100 messages. I expect at least 2 index entries being added per segment. > Empty log index file created when it shouldn't be empty > --- > > Key: KAFKA-593 > URL: https://issues.apache.org/jira/browse/KAFKA-593 > Project: Kafka > Issue Type: Bug >Reporter: Yang Ye > Attachments: kafka_583_zk_kafka_data.tar.gz > > > We have met empty index file during system test when it shouldn't be empty. > In this case, there're around 100 messages in each segment, each of size > around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at > least 2 log index entries, but we see empty index file. The kafka and > zookeeper logs are attached > [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/ > [yye@yye-ld test_1-2]$ ls -l > total 84 > -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index > -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log > -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index > -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index > -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index > -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index > -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log > -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index > -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log > -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index > -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-188) Support multiple data directories
[ https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486921#comment-13486921 ] Jun Rao commented on KAFKA-188: --- Our system tests fail with the latest patch. python -B system_test_runner.py 2>&1 | tee test.out Saw the following in broker log. [2012-10-30 07:40:19,682] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.io.IOException: No such file or directory at java.io.UnixFileSystem.createFileExclusively(Native Method) at java.io.File.createNewFile(File.java:883) at kafka.utils.FileLock.(FileLock.scala:12) at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64) at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.log.LogManager.(LogManager.scala:64) at kafka.server.KafkaServer.startup(KafkaServer.scala:60) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2012-10-30 07:40:19,683] INFO [Kafka Server 1], shutting down (kafka.server.KafkaServer) > Support multiple data directories > - > > Key: KAFKA-188 > URL: https://issues.apache.org/jira/browse/KAFKA-188 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps > Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, > KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch > > > Currently we allow only a single data directory. This means that a multi-disk > configuration needs to be a RAID array or LVM volume or something like that > to be mounted as a single directory. > For a high-throughput low-reliability configuration this would mean RAID0 > striping. Common wisdom in Hadoop land has it that a JBOD setup that just > mounts each disk as a separate directory and does application-level balancing > over these results in about 30% write-improvement. For example see this claim > here: > http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html > It is not clear to me why this would be the case--it seems the RAID > controller should be able to balance writes as well as the application so it > may depend on the details of the setup. > Nonetheless this would be really easy to implement, all you need to do is add > multiple data directories and balance partition creation over these disks. > One problem this might cause is if a particular topic is much larger than the > others it might unbalance the load across the disks. The partition->disk > assignment policy should probably attempt to evenly spread each topic to > avoid this, rather than just trying keep the number of partitions balanced > between disks. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-592) Register metrics beans at kafka server startup
[ https://issues.apache.org/jira/browse/KAFKA-592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-592. - > Register metrics beans at kafka server startup > --- > > Key: KAFKA-592 > URL: https://issues.apache.org/jira/browse/KAFKA-592 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-592-v1.patch > > > jmx beans are not registered until the corresponding part of the code > executes. To set alerts on some of the server side beans, they need to be > registered at server startup. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-592) Register metrics beans at kafka server startup
[ https://issues.apache.org/jira/browse/KAFKA-592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-592. --- Resolution: Fixed Thanks for the patch. +1. Committed to 0.8 by making registerStats private and adding a comment. > Register metrics beans at kafka server startup > --- > > Key: KAFKA-592 > URL: https://issues.apache.org/jira/browse/KAFKA-592 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-592-v1.patch > > > jmx beans are not registered until the corresponding part of the code > executes. To set alerts on some of the server side beans, they need to be > registered at server startup. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Reopened] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reopened KAFKA-577: --- > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-575) Partition.makeFollower() reads broker info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-575. --- Resolution: Fixed Fix Version/s: 0.8 Thanks for patch v3. +1. Committed to 0.8. > Partition.makeFollower() reads broker info from ZK > -- > > Key: KAFKA-575 > URL: https://issues.apache.org/jira/browse/KAFKA-575 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-575-v1.patch, kafka-575-v2.patch, > kafka-575-v3-correct.patch, kafka-575-v3.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > To follow a new leader, Partition.makeFollower() has to obtain the broker > info of the new leader. Currently, it reads that info from ZK for every > affected partition. This increases the time for a leader to truly available. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-575) Partition.makeFollower() reads broker info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-575. - > Partition.makeFollower() reads broker info from ZK > -- > > Key: KAFKA-575 > URL: https://issues.apache.org/jira/browse/KAFKA-575 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-575-v1.patch, kafka-575-v2.patch, > kafka-575-v3-correct.patch, kafka-575-v3.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > To follow a new leader, Partition.makeFollower() has to obtain the broker > info of the new leader. Currently, it reads that info from ZK for every > affected partition. This increases the time for a leader to truly available. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly
[ https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486567#comment-13486567 ] Jun Rao commented on KAFKA-588: --- Thanks for the patch. In LogSegmentTest, could we set indexIntervalSize such that we force an index entry to be created for every message? > Index truncation doesn't seem to remove the last entry properly > --- > > Key: KAFKA-588 > URL: https://issues.apache.org/jira/browse/KAFKA-588 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jay Kreps >Priority: Blocker > Labels: bugs > Attachments: KAFKA-588.patch > > > [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment > /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset > 429050 (kafka.log. > Log) > [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding > fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with > fetcherId 0 (kafka.server.R > eplicaFetcherManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader > and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> > PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), > (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" > },3))) (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 1 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 0 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error > due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempt to append an offset (429050) no > larger than the last offset appended (429050). > at kafka.log.OffsetIndex.append(OffsetIndex.scala:180) > at kafka.log.LogSegment.append(LogSegment.scala:56) > at kafka.log.Log.append(Log.scala:273) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486548#comment-13486548 ] Jun Rao commented on KAFKA-577: --- Thanks for patch v2. Instead of exiting on first verification failure, it would be better if we dump the whole log and report all failed verification at the very end. > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload
[ https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-267: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) +1 on patch v6. Committed to 0.8. > Enhance ProducerPerformance to generate unique random Long value for payload > > > Key: KAFKA-267 > URL: https://issues.apache.org/jira/browse/KAFKA-267 > Project: Kafka > Issue Type: Improvement >Reporter: John Fung >Assignee: Yang Ye > Fix For: 0.8 > > Attachments: kafka-267-v1.patch, kafka-267-v2.patch, > kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, > kafka-267-v6.patch, kafka-267-v7.patch > > > This is achieved by: > 1. Adding a new class UniqueRandom to shuffle a range of numbers. > 2. An optional new argument "start-index" is added to specify the starting > number of the range to be shuffled. If this argument is omitted, it is > defaulted to 1. So it is backward compatible with the argument options. > 3. The ending number of the range is the starting number + number of messages > - 1. > Other ProducerPerformance advancement: > 1. producing to multiple topics > 2. supporting multiple instances of producer performance ( and distinguishes > them) > 3. allowing waiting some time after sending a request -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-577: -- Comment: was deleted (was: +1 on patch v6. Committed to 0.8.) > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-577. --- Resolution: Fixed Fix Version/s: 0.8 +1 on patch v6. Committed to 0.8. > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Fix For: 0.8 > > Attachments: kafka_577_v1.diff, kafka_577_v2.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-188) Support multiple data directories
[ https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486145#comment-13486145 ] Jun Rao commented on KAFKA-188: --- Thanks for patch v5. Some more comments: 50. LogManager.nextLogDir(): zeros should only include dirs not already used, right? Currently, it seems to include all log dirs. 51. ReplicaManager.checkpointHighWatermarks(): When handling a leaderAndIsr request, we first create a partition and then create a local replica (which creates the local log). So, there is a slight possibility that a partition in allPartitions may not have a local log. The simplest way is to ignore such partition when checkpointing HW. 52. VerifiableProperties: The following constructor doesn't seem to be used. def this() = this(new Properties) > Support multiple data directories > - > > Key: KAFKA-188 > URL: https://issues.apache.org/jira/browse/KAFKA-188 > Project: Kafka > Issue Type: New Feature >Reporter: Jay Kreps > Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, > KAFKA-188-v4.patch, KAFKA-188-v5.patch > > > Currently we allow only a single data directory. This means that a multi-disk > configuration needs to be a RAID array or LVM volume or something like that > to be mounted as a single directory. > For a high-throughput low-reliability configuration this would mean RAID0 > striping. Common wisdom in Hadoop land has it that a JBOD setup that just > mounts each disk as a separate directory and does application-level balancing > over these results in about 30% write-improvement. For example see this claim > here: > http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html > It is not clear to me why this would be the case--it seems the RAID > controller should be able to balance writes as well as the application so it > may depend on the details of the setup. > Nonetheless this would be really easy to implement, all you need to do is add > multiple data directories and balance partition creation over these disks. > One problem this might cause is if a particular topic is much larger than the > others it might unbalance the load across the disks. The partition->disk > assignment policy should probably attempt to evenly spread each topic to > avoid this, rather than just trying keep the number of partitions balanced > between disks. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486091#comment-13486091 ] Jun Rao commented on KAFKA-574: --- Thanks for the patch. Some comments: 1. ReplicaStateMachine.handleStateChange(): There is one more optimization to consider. When handling the OnlineReplica case, we don't really need to read Isr from ZK and can read it from in-memory cache. To do that, we can extend ControllerContext.allLeaders to store LeaderAndIsr, instead of just the broker Id of the leader. This leaderAndIsr cache will be updated every time the controller makes a leader change. 2. 0.8 has moved since you uploaded the patch. Could you rebase? > KafkaController unnecessarily reads leaderAndIsr info from ZK > - > > Key: KAFKA-574 > URL: https://issues.apache.org/jira/browse/KAFKA-574 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Prashanth Menon >Priority: Blocker > Labels: bugs > Attachments: KAFKA-574-v1.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is > unnecessary since in onBrokerFailure(), we will make leader and isr change > anyway so there is no need to first read that information from ZK. Latency is > critical in onBrokerFailure() since it determines how quickly a leader can be > made online. > Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() > unnecessarily. In this case, the controller does not change the leader or the > isr. It just needs to send the current leader and the isr info to the newly > started broker. We already cache leader in the controller. Isr in theory > could change any time by the leader. So, reading from ZK doesn't guarantee > that we can get the latest isr anyway. Instead, we just need to get the isr > last selected by the controller (which can be cached together with the leader > in the controller). If the leader epoc in a broker is at or larger than the > epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, > the leader and the isr selected by the controller should be used. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-575) Partition.makeFollower() reads broker info from ZK
[ https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486079#comment-13486079 ] Jun Rao commented on KAFKA-575: --- Patch looks good. Just one comment: 1. ControllerBrokerRequestBatch.sendRequestsToBrokers(): Instead of including all live brokers in the leaderAndIsr request, we probably can just include brokers that are leaders in each LeaderAndIsr request. > Partition.makeFollower() reads broker info from ZK > -- > > Key: KAFKA-575 > URL: https://issues.apache.org/jira/browse/KAFKA-575 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs > Attachments: kafka-575-v1.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > To follow a new leader, Partition.makeFollower() has to obtain the broker > info of the new leader. Currently, it reads that info from ZK for every > affected partition. This increases the time for a leader to truly available. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-589: -- Affects Version/s: 0.8 Labels: bugs newbie (was: ) > Clean shutdown after startup connection failure > --- > > Key: KAFKA-589 > URL: https://issues.apache.org/jira/browse/KAFKA-589 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8, 0.7.2 >Reporter: Jason Rosenberg >Priority: Minor > Labels: bugs, newbie > > Hi, > I'm embedding the kafka server (0.7.2) in an application container. I've > noticed that if I try to start the server without zookeeper being available, > by default it gets a zk connection timeout after 6 seconds, and then throws > an Exception out of KafkaServer.startup()E.g., I see this stack trace: > Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: > Unable to connect to zookeeper server within timeout: 6000 > at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84) > at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) > at kafka.log.LogManager.(LogManager.scala:93) > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > So that's ok, I can catch the exception, and then shut everything down > gracefully, in this case. However, when I do this, it seems there is a > daemon thread still around, which doesn't quit, and so the server never > actually exits the jvm. Specifically, this thread seems to hang around: > "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on > condition [112c07000] >java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <7f40d4be8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > at java.lang.Thread.run(Thread.java:680) > Looking at the code in kafka.log.LogManager(), it does seem like it starts up > the scheduler to clean logs, before then trying to connect to zk (and in this > case fail): > /* Schedule the cleanup task to delete old logs */ > if(scheduler != null) { > info("starting log cleaner every " + logCleanupIntervalMs + " ms") > scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) > } > So this scheduler does not appear to be stopped if startup fails. However, > if I catch the above RuntimeException, and then call KafkaServer.shutdown(), > then it will stop the scheduler, and all is good. > However, it seems odd that if I get an exception when calling > KafkaServer.startup(), that I should still have to do a > KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally > cleanup after itself if startup() gets an exception? I'm not sure I can > reliably call shutdown() after a failed startup() -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485798#comment-13485798 ] Jun Rao commented on KAFKA-589: --- This problem exists in 0.8 too. What we need to do is to add a try/catch in KafkaServer.start() and call shutdown if we hit any exceptions. > Clean shutdown after startup connection failure > --- > > Key: KAFKA-589 > URL: https://issues.apache.org/jira/browse/KAFKA-589 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8, 0.7.2 >Reporter: Jason Rosenberg >Priority: Minor > Labels: bugs, newbie > > Hi, > I'm embedding the kafka server (0.7.2) in an application container. I've > noticed that if I try to start the server without zookeeper being available, > by default it gets a zk connection timeout after 6 seconds, and then throws > an Exception out of KafkaServer.startup()E.g., I see this stack trace: > Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: > Unable to connect to zookeeper server within timeout: 6000 > at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84) > at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) > at kafka.log.LogManager.(LogManager.scala:93) > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > So that's ok, I can catch the exception, and then shut everything down > gracefully, in this case. However, when I do this, it seems there is a > daemon thread still around, which doesn't quit, and so the server never > actually exits the jvm. Specifically, this thread seems to hang around: > "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on > condition [112c07000] >java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <7f40d4be8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > at java.lang.Thread.run(Thread.java:680) > Looking at the code in kafka.log.LogManager(), it does seem like it starts up > the scheduler to clean logs, before then trying to connect to zk (and in this > case fail): > /* Schedule the cleanup task to delete old logs */ > if(scheduler != null) { > info("starting log cleaner every " + logCleanupIntervalMs + " ms") > scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) > } > So this scheduler does not appear to be stopped if startup fails. However, > if I catch the above RuntimeException, and then call KafkaServer.shutdown(), > then it will stop the scheduler, and all is good. > However, it seems odd that if I get an exception when calling > KafkaServer.startup(), that I should still have to do a > KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally > cleanup after itself if startup() gets an exception? I'm not sure I can > reliably call shutdown() after a failed startup() -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: move 0.8 back to trunk
I think it's better if we do this before moving to git. I am thinking of making the change next week. We likely need a new repository when we graduate. We can probably move to git then. Thanks, Jun On Sat, Oct 27, 2012 at 9:58 PM, Joe Stein wrote: > should we do this before or after we move to git ? > > On Fri, Oct 26, 2012 at 7:41 PM, Jun Rao wrote: > > > The 0.8 branch has stabilized quite a bit now. We still have to look into > > some corner cases and performance, but most system tests are now > passing. I > > think it's time to bring 0.8 back to trunk and let people start trying > it. > > I think we should just move trunk to an 0.7 branch and then move 0.8 to > > trunk. Any objections? > > > > Thanks, > > > > Jun > > > > > > -- > > /* > Joe Stein > http://www.linkedin.com/in/charmalloc > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > */ >
[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index
[ https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485306#comment-13485306 ] Jun Rao commented on KAFKA-577: --- Thanks for the patch. Some comments: 1. Could you add a --verifyOnly option so that we only do the verification but not print out the content? 2. The text in the following statement is too verbose. It doesn't need to print the index file since it's provided in the command line. It can just say "Index position %d doesn't match log position at offset %d". System.err.println(("The offset in index file [%s] does not match the offset stored in " + "log file [%s], they're %d and %d separately").format(entry.offset + index.baseOffset, messageAndOffset.offset)) 3. Shouldn't we use %d instead of %l in the following line? System.err.println("The offset in the data log file [%s] is not consecutive, [%l] follows [%l]".format(file.getName, messageAndOffset.offset, lastOffset)) 4. Could you add a shell script for DumpLogSegments in bin/ ? > extend DumpLogSegments to verify consistency btw data and index > --- > > Key: KAFKA-577 > URL: https://issues.apache.org/jira/browse/KAFKA-577 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao > Labels: newbie, tools > Attachments: kafka_577_v1.diff > > Original Estimate: 24h > Remaining Estimate: 24h > > It would be good to extend DumpLogSegments to do the following verification: > 1. The offsets stored in the index match those in the log data. > 2. The offsets in the data log is consecutive. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload
[ https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485042#comment-13485042 ] Jun Rao commented on KAFKA-267: --- What we can do is that, in system tests, tune #retries and backoff time in ProducerPerformance according to the failure scenario so that we expect no data loss on the producer side. Both knobs are exposed in ProducerPerformance now. > Enhance ProducerPerformance to generate unique random Long value for payload > > > Key: KAFKA-267 > URL: https://issues.apache.org/jira/browse/KAFKA-267 > Project: Kafka > Issue Type: Improvement >Reporter: John Fung >Assignee: Yang Ye > Attachments: kafka-267-v1.patch, kafka-267-v2.patch, > kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, > kafka-267-v6.patch, kafka-267-v7.patch > > > This is achieved by: > 1. Adding a new class UniqueRandom to shuffle a range of numbers. > 2. An optional new argument "start-index" is added to specify the starting > number of the range to be shuffled. If this argument is omitted, it is > defaulted to 1. So it is backward compatible with the argument options. > 3. The ending number of the range is the starting number + number of messages > - 1. > Other ProducerPerformance advancement: > 1. producing to multiple topics > 2. supporting multiple instances of producer performance ( and distinguishes > them) > 3. allowing waiting some time after sending a request -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures
[ https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485030#comment-13485030 ] Jun Rao commented on KAFKA-532: --- The purpose of controller epoc is to prevent an older controller from overriding the data already updated by a newer controller. What we can do is that when a controller wants to update leaderAndIsr, it first checks and makes sure that the controller epoc stored in the path is less than or equal to the current controller epoc. Otherwise, the controller won't update the path. This way, the controller epoc associated with leaderAndIsr is only updated when it's truly needed, i.e., when the controller wants to update the leader or the isr. So we don't need to rewrite leaderAndIsr during controller failover. When sending leaderAndIsr requests, we just need to send the controller epoc stored in the leaderAndIsr path. > Multiple controllers can co-exist during soft failures > -- > > Key: KAFKA-532 > URL: https://issues.apache.org/jira/browse/KAFKA-532 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Labels: bugs > Attachments: kafka-532-v1.patch, kafka-532-v2.patch > > Original Estimate: 48h > Remaining Estimate: 48h > > If the current controller experiences an intermittent soft failure (GC pause) > in the middle of leader election or partition reassignment, a new controller > might get elected and start communicating new state change decisions to the > brokers. After recovering from the soft failure, the old controller might > continue sending some stale state change decisions to the brokers, resulting > in unexpected failures. We need to introduce a controller generation id that > increments with controller election. The brokers should reject any state > change requests by a controller with an older generation id. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload
[ https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485024#comment-13485024 ] Jun Rao commented on KAFKA-267: --- Thanks for patch v7. Not sure if printing messages in syncProducer is a good idea. In general, messages sent in syncProducer are not necessarily strings and may not be printable. ProducerPerformance, on the other hand, always sends string messages, which are printable. So, I suggest that we keep the message printing in ProducerPerformance. > Enhance ProducerPerformance to generate unique random Long value for payload > > > Key: KAFKA-267 > URL: https://issues.apache.org/jira/browse/KAFKA-267 > Project: Kafka > Issue Type: Improvement >Reporter: John Fung >Assignee: Yang Ye > Attachments: kafka-267-v1.patch, kafka-267-v2.patch, > kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, > kafka-267-v6.patch, kafka-267-v7.patch > > > This is achieved by: > 1. Adding a new class UniqueRandom to shuffle a range of numbers. > 2. An optional new argument "start-index" is added to specify the starting > number of the range to be shuffled. If this argument is omitted, it is > defaulted to 1. So it is backward compatible with the argument options. > 3. The ending number of the range is the starting number + number of messages > - 1. > Other ProducerPerformance advancement: > 1. producing to multiple topics > 2. supporting multiple instances of producer performance ( and distinguishes > them) > 3. allowing waiting some time after sending a request -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly
[ https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485020#comment-13485020 ] Jun Rao commented on KAFKA-588: --- >From the log, the broker wants to truncate log to offset 429050. The last >entry in the log is the following and looks correct. offset: 429049 position: 155842578 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 3220325748 However, the last index entry is the following, which is off by 1. offset: 429050 position: 155843100 > Index truncation doesn't seem to remove the last entry properly > --- > > Key: KAFKA-588 > URL: https://issues.apache.org/jira/browse/KAFKA-588 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Priority: Blocker > Labels: bugs > > [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment > /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset > 429050 (kafka.log. > Log) > [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding > fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with > fetcherId 0 (kafka.server.R > eplicaFetcherManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader > and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> > PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), > (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" > },3))) (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 1 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 0 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error > due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempt to append an offset (429050) no > larger than the last offset appended (429050). > at kafka.log.OffsetIndex.append(OffsetIndex.scala:180) > at kafka.log.LogSegment.append(LogSegment.scala:56) > at kafka.log.Log.append(Log.scala:273) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly
[ https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-588: -- Labels: bugs (was: ) > Index truncation doesn't seem to remove the last entry properly > --- > > Key: KAFKA-588 > URL: https://issues.apache.org/jira/browse/KAFKA-588 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Priority: Blocker > Labels: bugs > > [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment > /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset > 429050 (kafka.log. > Log) > [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding > fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with > fetcherId 0 (kafka.server.R > eplicaFetcherManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader > and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> > PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), > (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" > },3))) (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 1 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the > follower state transition to follow leader 2 for topic test_1 partition 0 > (kafka.server.ReplicaManager) > [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current > leader epoch [2] is larger or equal to the requested leader epoch [2], > discard the become follower request (kafka.cluster.Partition) > [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error > due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempt to append an offset (429050) no > larger than the last offset appended (429050). > at kafka.log.OffsetIndex.append(OffsetIndex.scala:180) > at kafka.log.LogSegment.append(LogSegment.scala:56) > at kafka.log.Log.append(Log.scala:273) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly
Jun Rao created KAFKA-588: - Summary: Index truncation doesn't seem to remove the last entry properly Key: KAFKA-588 URL: https://issues.apache.org/jira/browse/KAFKA-588 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Priority: Blocker [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset 429050 (kafka.log. Log) [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with fetcherId 0 (kafka.server.R eplicaFetcherManager) [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3))) (kafka.server.ReplicaManager) [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the follower state transition to follow leader 2 for topic test_1 partition 1 (kafka.server.ReplicaManager) [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current leader epoch [2] is larger or equal to the requested leader epoch [2], discard the become follower request (kafka.cluster.Partition) [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the follower state transition to follow leader 2 for topic test_1 partition 0 (kafka.server.ReplicaManager) [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current leader epoch [2] is larger or equal to the requested leader epoch [2], discard the become follower request (kafka.cluster.Partition) [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error due to (kafka.server.ReplicaFetcherThread) java.lang.IllegalArgumentException: Attempt to append an offset (429050) no larger than the last offset appended (429050). at kafka.log.OffsetIndex.append(OffsetIndex.scala:180) at kafka.log.LogSegment.append(LogSegment.scala:56) at kafka.log.Log.append(Log.scala:273) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1
[ https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-584: -- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the review. Changed the logging to trace level and committed to 0.8. > produce/fetch remote time metric not set correctly when num.acks = 1 > > > Key: KAFKA-584 > URL: https://issues.apache.org/jira/browse/KAFKA-584 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Jun Rao > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-584.patch > > > When num.acks = 1, the produce/fetch remote time is set to a very high value > (several hours). This is due to a race condition on the apiLocalTime, which > is initialized to -1, that makes the (responseTime - apiLocalTime) a very > large value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1
[ https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-584. - > produce/fetch remote time metric not set correctly when num.acks = 1 > > > Key: KAFKA-584 > URL: https://issues.apache.org/jira/browse/KAFKA-584 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Jun Rao > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-584.patch > > > When num.acks = 1, the produce/fetch remote time is set to a very high value > (several hours). This is due to a race condition on the apiLocalTime, which > is initialized to -1, that makes the (responseTime - apiLocalTime) a very > large value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1
[ https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484251#comment-13484251 ] Jun Rao commented on KAFKA-584: --- Also, volatile makes a long value atomic according to the following article, which is what we want. http://gee.cs.oswego.edu/dl/cpj/jmm.html > produce/fetch remote time metric not set correctly when num.acks = 1 > > > Key: KAFKA-584 > URL: https://issues.apache.org/jira/browse/KAFKA-584 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-584.patch > > > When num.acks = 1, the produce/fetch remote time is set to a very high value > (several hours). This is due to a race condition on the apiLocalTime, which > is initialized to -1, that makes the (responseTime - apiLocalTime) a very > large value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1
[ https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-584: -- Fix Version/s: 0.8 Assignee: Jun Rao Status: Patch Available (was: Open) > produce/fetch remote time metric not set correctly when num.acks = 1 > > > Key: KAFKA-584 > URL: https://issues.apache.org/jira/browse/KAFKA-584 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Jun Rao > Labels: bugs > Fix For: 0.8 > > Attachments: kafka-584.patch > > > When num.acks = 1, the produce/fetch remote time is set to a very high value > (several hours). This is due to a race condition on the apiLocalTime, which > is initialized to -1, that makes the (responseTime - apiLocalTime) a very > large value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1
[ https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-584: -- Attachment: kafka-584.patch Attach a patch. The problem is that with ack = 1, response could be sent before apilocaltime is updated. Fixed it by setting apilocatime to responsetime, if not set. Also, since those times are updated in different threads, make them volatile so that updates are exposed to other threads. Now, remote times are 0 with ack = 1. > produce/fetch remote time metric not set correctly when num.acks = 1 > > > Key: KAFKA-584 > URL: https://issues.apache.org/jira/browse/KAFKA-584 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Neha Narkhede > Labels: bugs > Attachments: kafka-584.patch > > > When num.acks = 1, the produce/fetch remote time is set to a very high value > (several hours). This is due to a race condition on the apiLocalTime, which > is initialized to -1, that makes the (responseTime - apiLocalTime) a very > large value. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-581) provides windows batch script for starting Kafka/Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484233#comment-13484233 ] Jun Rao commented on KAFKA-581: --- Since we have quite a few scripts in bin/. I think it's actually better to have the windows version in a separate directory. Perhaps we can update our quickstart with the windows script dir when 0.8 is released. > provides windows batch script for starting Kafka/Zookeeper > -- > > Key: KAFKA-581 > URL: https://issues.apache.org/jira/browse/KAFKA-581 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 0.8 > Environment: Windows >Reporter: antoine vianey >Priority: Trivial > Labels: features, run, windows > Fix For: 0.8 > > Attachments: kafka-console-consumer.bat, kafka-console-producer.bat, > kafka-run-class.bat, kafka-server-start.bat, sbt.bat, > zookeeper-server-start.bat > > Original Estimate: 24h > Remaining Estimate: 24h > > Provide a port for quickstarting Kafka dev on Windows : > - kafka-run-class.bat > - kafka-server-start.bat > - zookeeper-server-start.bat > This will help Kafka community growth -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8
[ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483894#comment-13483894 ] Jun Rao commented on KAFKA-340: --- 21.4 I looked again. Yes, your code is correct. One more minor comment. 22. ControllerBrokerRequestBatch.sendRequestsToBrokers(): In m.foreach, instead of using r, could we use case(brokerId, partitionsToBeStopped)? Then we can refer to those names directly, instead of r._1 and r._2. Other than that, the patch looks good. > Implement clean shutdown in 0.8 > --- > > Key: KAFKA-340 > URL: https://issues.apache.org/jira/browse/KAFKA-340 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy >Priority: Blocker > Labels: bugs > Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, > KAFKA-340-v3.patch > > Original Estimate: 168h > Remaining Estimate: 168h > > If we are shutting down a broker when the ISR of a partition includes only > that broker, we could lose some messages that have been previously committed. > For clean shutdown, we need to guarantee that there is at least 1 other > broker in ISR after the broker is shut down. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8
[ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483657#comment-13483657 ] Jun Rao commented on KAFKA-340: --- 21.2 My confusion is that I thought replicatedPartitionsBrokerLeads is a val instead of a method. Could you change replicatedPartitionsBrokerLeads.toSet to replicatedPartitionsBrokerLeads().toSet to indicate that there is side effect? Also, could you add a comment to shutdownBroker() to describe what it does and what the return value is? 21.3 Looked again. Yes, you are right. We don't need to lock there. 21.4 The code still doesn't send stopReplicaRequests for partitions whose leader is on the broker to be shut down. This is not truly needed since the leader won't issue any fetch request. However, it would be better to stop those replicas too. > Implement clean shutdown in 0.8 > --- > > Key: KAFKA-340 > URL: https://issues.apache.org/jira/browse/KAFKA-340 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy >Priority: Blocker > Labels: bugs > Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, > KAFKA-340-v3.patch > > Original Estimate: 168h > Remaining Estimate: 168h > > If we are shutting down a broker when the ISR of a partition includes only > that broker, we could lose some messages that have been previously committed. > For clean shutdown, we need to guarantee that there is at least 1 other > broker in ISR after the broker is shut down. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-571) Add more test cases to System Test
[ https://issues.apache.org/jira/browse/KAFKA-571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-571. - > Add more test cases to System Test > -- > > Key: KAFKA-571 > URL: https://issues.apache.org/jira/browse/KAFKA-571 > Project: Kafka > Issue Type: Sub-task >Reporter: John Fung >Assignee: John Fung > Fix For: 0.8 > > Attachments: kafka-0.7.0.jar, kafka-571-SimpleConsumerShell.patch, > kafka-571-v1.patch, kafka-571-v2.patch, kafka-571-v3.patch, > kafka-perf-0.7.0.jar, zkclient-0.1.jar > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira