Re: git wip incubator-kafka.git

2012-11-27 Thread Jun Rao
Chris,

I plan to submit an infra ticket for the post graduation items. One of the
subtasks will be to move our svn repo to a new location (w/o incubator). We
could either reuse your ticket and ask for a new repo name or we could
close your ticket and create a new one. Which one would you prefer?

Thanks,

Jun

On Tue, Sep 25, 2012 at 8:09 PM, Chris Burroughs
wrote:

> https://issues.apache.org/jira/browse/INFRA-5111
>
> Pleaes check out
> https://git-wip-us.apache.org/repos/asf/incubator-kafka.git (currently
> read only) and report on any issues.  Joe and I were looking at the git
> mirror the other day (which this is based on) and didn't notice
> anything, so any bugs must be super vigilant.
>
> (I defer to Joe if he would rather wait until after the rc VOTE to go
> read/write, or make 0.7.2 the first git release.)
>


[jira] [Commented] (KAFKA-635) Producer error when trying to send not displayed unless in DEBUG logging level

2012-11-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13505222#comment-13505222
 ] 

Jun Rao commented on KAFKA-635:
---

Chris,

Thanks for reporting this. Do you really need to turn on debug level logging? 
The extra logging that you showed is at WARN level.

> Producer error when trying to send not displayed unless in DEBUG logging level
> --
>
> Key: KAFKA-635
> URL: https://issues.apache.org/jira/browse/KAFKA-635
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Java client
>Reporter: Chris Curtin
>Priority: Minor
>
> When trying to figure out how to connection with 0.8.0 Producer was only 
> seeing exceptions:
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed 
> to send messages after 3 tries.
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:70)
>   at kafka.producer.Producer.send(Producer.scala:75)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>   at com.spop.kafka.playproducer.TestProducer.main(TestProducer.java:40)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
> Changing log4j level to DEBUG showed the actual error:
> 878  [main] WARN  kafka.producer.async.DefaultEventHandler  - failed to send 
> to broker 3 with data Map([test1,0] -> 
> ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc 
> = 1906312613, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=22 
> cap=22]),0), ))
> java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:94)
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:221)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:87)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:81)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:81)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57)
>   at kafka.producer.Producer.send(Producer.scala:75)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>   at com.spop.kafka.playproducer.TestProducer.main(TestProducer.java:40)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
> Submitted per Jay's request in a mailing list thread.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-633) AdminTest.testShutdownBroker fails

2012-11-26 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-633:
-

 Summary: AdminTest.testShutdownBroker fails
 Key: KAFKA-633
 URL: https://issues.apache.org/jira/browse/KAFKA-633
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
[0m
junit.framework.AssertionFailedError: expected:<2> but was:<3>
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.failNotEquals(Assert.java:277)
at junit.framework.Assert.assertEquals(Assert.java:64)
at junit.framework.Assert.assertEquals(Assert.java:195)
at junit.framework.Assert.assertEquals(Assert.java:201)
at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at junit.framework.TestCase.runTest(TestCase.java:164)
at junit.framework.TestCase.runBare(TestCase.java:130)
at junit.framework.TestResult$1.protect(TestResult.java:110)
at junit.framework.TestResult.runProtected(TestResult.java:128)
at junit.framework.TestResult.run(TestResult.java:113)
at junit.framework.TestCase.run(TestCase.java:120)
at junit.framework.TestSuite.runTest(TestSuite.java:228)
at junit.framework.TestSuite.run(TestSuite.java:223)
at junit.framework.TestSuite.runTest(TestSuite.java:228)
at junit.framework.TestSuite.run(TestSuite.java:223)
at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
at 
org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
at sbt.TestRunner.run(TestFramework.scala:53)
at sbt.TestRunner.runTest$1(TestFramework.scala:67)
at sbt.TestRunner.run(TestFramework.scala:76)
at 
sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
at 
sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
at 
sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
at sbt.NamedTestTask.run(TestFramework.scala:92)
at 
sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
at 
sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
at sbt.impl.RunTask.runTask(RunTask.scala:85)
at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
at 
sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at 
sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
at sbt.Control$.trapUnit(Control.scala:19)
at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-632) ProducerRequest should take ByteBufferMessageSet instead of MessageSet

2012-11-26 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-632:
-

 Summary: ProducerRequest should take ByteBufferMessageSet instead 
of MessageSet
 Key: KAFKA-632
 URL: https://issues.apache.org/jira/browse/KAFKA-632
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


Currently, ProducerRequest takes MessageSet in the constructor and casts it to 
ByteBufferMessageSet. It should take ByteBufferMessageSet directly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


trunk and branches update

2012-11-26 Thread Jun Rao
Hi,

Since we are getting close to the 0.8 release, I have made the following
svn changes in our repository.

moved 0.7 branch to 0.7.0
moved trunk to 0.7
copied 0.8 to trunk

Now trunk is available for post 0.8 development. 0.8 changes will still be
checked into the 0.8 branch and we will merge the 0.8 changes to trunk
periodically.

Thanks,

Jun


Kafka 0.8 trial version

2012-11-26 Thread Jun Rao
Hi, Everyone,

Thanks to people who contributed to the project, we have made significant
progress in Kafka replication. Now, we'd like people to give it a try.
Please check out revision 1411070 from
https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8 and follow
the following wiki.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start

The code is expected to be stable and we are still looking into some
performance issues with hundreds of topic/partitions. If you see any
issues, please let us know.

Thanks,

Jun


[jira] [Closed] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-18 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-612.
-


> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-612.patch, kafka-612-v2.patch
>
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-18 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-612:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for the review. Changed the method name and committed to 0.8.

> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-612.patch, kafka-612-v2.patch
>
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-18 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-612:
--

Attachment: kafka-612-v2.patch

Thanks for the review. Attach patch v2.

1.1 This is to handle the case that a partition is removed from the fetchMap 
while a fetch request is issued to the broker (since we don't hold the lock 
when making fetch requests). When this happens, we just need to ignore this 
partition.
1.2 Good suggestion. Fixed.

2. That info message is really used to track how long it takes a broker to 
complete the leaderAndIsr request. From the broker's perspective, once the 
leader is set in a partition, the partition can serve read/write requests.

3.1 Agreed that the naming is confusing. Changed it to partitionMap.
3.2 Done.

> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-612.patch, kafka-612-v2.patch
>
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures

2012-11-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499941#comment-13499941
 ] 

Jun Rao commented on KAFKA-532:
---

50. My feeling is that request level error code conveys the same meaning that 
every partition fails with the same error code and my preference is to keep all 
response format consistent. However, if you prefer, it's ok to check in the 
patch as it is and revisit it when we finalize the wire format. So, +1 from me 
on the patch.

> Multiple controllers can co-exist during soft failures
> --
>
> Key: KAFKA-532
> URL: https://issues.apache.org/jira/browse/KAFKA-532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-532-v1.patch, kafka-532-v2.patch, 
> kafka-532-v3.patch, kafka-532-v4.patch, kafka-532-v5.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> If the current controller experiences an intermittent soft failure (GC pause) 
> in the middle of leader election or partition reassignment, a new controller 
> might get elected and start communicating new state change decisions to the 
> brokers. After recovering from the soft failure, the old controller might 
> continue sending some stale state change decisions to the brokers, resulting 
> in unexpected failures. We need to introduce a controller generation id that 
> increments with controller election. The brokers should reject any state 
> change requests by a controller with an older generation id.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures

2012-11-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499860#comment-13499860
 ] 

Jun Rao commented on KAFKA-532:
---

Thanks for patch v5. Looks good. Just a couple of minor comments.

50. LeaderAndIsrResponse and StopReplicaResponse: Currently, for all types of 
response, we have moved to the model that there is no global error code at the 
response level. Instead, if a request can't be processed for any partition, we 
just set the same error code for each partition in the response. This achieves 
the same effect, but makes the handling of the response easier. One just has to 
deal with the error code per partition.

51. Are the changes in test/resources/log4j.properties intended?


> Multiple controllers can co-exist during soft failures
> --
>
> Key: KAFKA-532
> URL: https://issues.apache.org/jira/browse/KAFKA-532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-532-v1.patch, kafka-532-v2.patch, 
> kafka-532-v3.patch, kafka-532-v4.patch, kafka-532-v5.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> If the current controller experiences an intermittent soft failure (GC pause) 
> in the middle of leader election or partition reassignment, a new controller 
> might get elected and start communicating new state change decisions to the 
> brokers. After recovering from the soft failure, the old controller might 
> continue sending some stale state change decisions to the brokers, resulting 
> in unexpected failures. We need to introduce a controller generation id that 
> increments with controller election. The brokers should reject any state 
> change requests by a controller with an older generation id.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-613.
---

   Resolution: Fixed
Fix Version/s: 0.8

Thanks for patch v2. +1. Committed to 0.8.

> MigrationTool should disable shallow iteration in the 0.7 consumer
> --
>
> Key: KAFKA-613
> URL: https://issues.apache.org/jira/browse/KAFKA-613
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka_613_v1.patch, kafka_613_v2.patch
>
>
> If shallow iteration is enabled, we should override it and log a warning.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-613.
-


> MigrationTool should disable shallow iteration in the 0.7 consumer
> --
>
> Key: KAFKA-613
> URL: https://issues.apache.org/jira/browse/KAFKA-613
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka_613_v1.patch, kafka_613_v2.patch
>
>
> If shallow iteration is enabled, we should override it and log a warning.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-618) Deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure

2012-11-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499309#comment-13499309
 ] 

Jun Rao commented on KAFKA-618:
---

Thanks for the patch. +1

> Deadlock between leader-finder-thread and consumer-fetcher-thread during 
> broker failure
> ---
>
> Key: KAFKA-618
> URL: https://issues.apache.org/jira/browse/KAFKA-618
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
>Priority: Blocker
> Fix For: 0.8
>
> Attachments: KAFKA-618-v1.patch
>
>
> This causes the test failure reported in KAFKA-607. This affects high-level 
> consumers - if they hit the deadlock then they would get wedged (or at least 
> until the consumer timeout).
> Here is the threaddump output that shows the issue:
> Found one Java-level deadlock:
> =
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
>   waiting for ownable synchronizer 0x7f2283ad, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by 
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread"
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
>   waiting to lock monitor 0x7f2288297190 (object 0x7f2283ab01d0, a 
> java.lang.Object),
>   which is held by 
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1"
> Java stack information for the threads listed above:
> ===
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x7f2283ad> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> at 
> kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131)
> at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
> - locked <0x7f2283ab01d0> (a java.lang.Object)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
> at 
> kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142)
> - waiting to lock <0x7f2283ab01d0> (a java.lang.Object)
> at 
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49)
> - locked <0x7f2283ab0338> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81)
> at 
> kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>  

[jira] [Resolved] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-614.
---

   Resolution: Fixed
Fix Version/s: 0.8

Thanks for the patch. +1 and committed to 0.8.

> DumpLogSegment offset verification is incorrect for compressed messages
> ---
>
> Key: KAFKA-614
> URL: https://issues.apache.org/jira/browse/KAFKA-614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie
> Fix For: 0.8
>
> Attachments: kafka_614_v1.patch
>
>
> During verification, DumpLogSegment tries to make sure that offsets are 
> consecutive. However, this won't be true for compressed messages since 
> FileMessageSet only does shallow iteration. The simplest fix is to skip the 
> verification for compressed messages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-614.
-


> DumpLogSegment offset verification is incorrect for compressed messages
> ---
>
> Key: KAFKA-614
> URL: https://issues.apache.org/jira/browse/KAFKA-614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie
> Fix For: 0.8
>
> Attachments: kafka_614_v1.patch
>
>
> During verification, DumpLogSegment tries to make sure that offsets are 
> consecutive. However, this won't be true for compressed messages since 
> FileMessageSet only does shallow iteration. The simplest fix is to skip the 
> verification for compressed messages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer

2012-11-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499269#comment-13499269
 ] 

Jun Rao commented on KAFKA-613:
---

Thanks for patch v1. It would be good to also log a warning that "shallow 
iteration is not supported" if shallowiteration is set to true in the 0.7 
consumer property file.

> MigrationTool should disable shallow iteration in the 0.7 consumer
> --
>
> Key: KAFKA-613
> URL: https://issues.apache.org/jira/browse/KAFKA-613
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
> Attachments: kafka_613_v1.patch
>
>
> If shallow iteration is enabled, we should override it and log a warning.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-620) UnknownHostError looking for a ZK node crashes the broker

2012-11-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13499043#comment-13499043
 ] 

Jun Rao commented on KAFKA-620:
---

Do you have other ZK hosts in your connection string?

> UnknownHostError looking for a ZK node crashes the broker
> -
>
> Key: KAFKA-620
> URL: https://issues.apache.org/jira/browse/KAFKA-620
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.7.1
> Environment: linux. Amazon's AMI
>Reporter: Matthew Rathbone
>
> If you totally kill a zookeeper node so that it's hostname no longer resolves 
> to anything, the broker will die with a java.net.UnknownHostException.
> You will then be unable to restart the broker until the unknown host(s) is 
> removed from the server.properties.
> We ran into this issue while testing our resilience to widespread AWS 
> outages, if you can point me to the right place, I could have a go at fixing 
> it? Unfortunately, I suspect the issue might be in the non-standard Zookeeper 
> library that kafka uses.
> Here's the stack trace:
> org.I0Itec.zkclient.exception.ZkException: Unable to connect to [list of 
> zookeepers]
>   at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:66)
>   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
>   at kafka.log.LogManager.(LogManager.scala:87)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
>   at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>   at kafka.Kafka$.main(Kafka.scala:50)
>   at kafka.Kafka.main(Kafka.scala)
> Caused by: java.net.UnknownHostException: zk-101
>   at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>   at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:850)
>   at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1201)
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1154)
>   at java.net.InetAddress.getAllByName(InetAddress.java:1084)
>   at java.net.InetAddress.getAllByName(InetAddress.java:1020)
>   at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:387)
>   at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>   at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>   at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>   ... 9 more

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-605) System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-605:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to 0.8.

> System Test - Log Retention Cases should wait longer before getting the 
> common starting offset in replica log segments
> --
>
> Key: KAFKA-605
> URL: https://issues.apache.org/jira/browse/KAFKA-605
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
> Fix For: 0.8
>
> Attachments: kafka-605-v1.patch, kafka-605-v2.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-605) System Test - Log Retention Cases should wait longer before getting the common starting offset in replica log segments

2012-11-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-605.
-


> System Test - Log Retention Cases should wait longer before getting the 
> common starting offset in replica log segments
> --
>
> Key: KAFKA-605
> URL: https://issues.apache.org/jira/browse/KAFKA-605
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
> Fix For: 0.8
>
> Attachments: kafka-605-v1.patch, kafka-605-v2.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-618) Deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure

2012-11-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13498555#comment-13498555
 ] 

Jun Rao commented on KAFKA-618:
---

This is a very good finding. The following is one way of breaking the deadlock.

In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass 
partitionMap (which is immutable) to each newly created ConsumerFetcherThread. 
This way, ConsumerFetcherThread.processPartitionData() and 
ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on 
ConsumerFetcherManager any more. If we do that, we can improve 
ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of 
noLeaderPartitionSet and partitionMap can be done together before calling 
closeAllFetchers(). Before, we have to clear partitionMap last because before 
all fetchers are stopped, the processing of the fetch request still needs to 
read partitionMap and expects it to be non-null. 

> Deadlock between leader-finder-thread and consumer-fetcher-thread during 
> broker failure
> ---
>
> Key: KAFKA-618
> URL: https://issues.apache.org/jira/browse/KAFKA-618
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
>Priority: Blocker
> Fix For: 0.8
>
>
> This causes the test failure reported in KAFKA-607. This affects high-level 
> consumers - if they hit the deadlock then they would get wedged (or at least 
> until the consumer timeout).
> Here is the threaddump output that shows the issue:
> Found one Java-level deadlock:
> =
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
>   waiting for ownable synchronizer 0x7f2283ad, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by 
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread"
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
>   waiting to lock monitor 0x7f2288297190 (object 0x7f2283ab01d0, a 
> java.lang.Object),
>   which is held by 
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1"
> Java stack information for the threads listed above:
> ===
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x7f2283ad> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> at 
> kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131)
> at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
> - locked <0x7f2283ab01d0> (a java.lang.Object)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
> at 
> kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142)
> - waiting to lock <0x7f2283ab01d0> (a java.lang.Object)
> at 
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49)
> - locked <0x7f2283ab0338> (a java.lang.Object)
> at 
> kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81)
> at 
> kafka.consumer

[jira] [Created] (KAFKA-614) DumpLogSegment offset verification is incorrect for compressed messages

2012-11-15 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-614:
-

 Summary: DumpLogSegment offset verification is incorrect for 
compressed messages
 Key: KAFKA-614
 URL: https://issues.apache.org/jira/browse/KAFKA-614
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


During verification, DumpLogSegment tries to make sure that offsets are 
consecutive. However, this won't be true for compressed messages since 
FileMessageSet only does shallow iteration. The simplest fix is to skip the 
verification for compressed messages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-613) MigrationTool should disable shallow iteration in the 0.7 consumer

2012-11-15 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-613:
-

 Summary: MigrationTool should disable shallow iteration in the 0.7 
consumer
 Key: KAFKA-613
 URL: https://issues.apache.org/jira/browse/KAFKA-613
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


If shallow iteration is enabled, we should override it and log a warning.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-544) Retain key in producer and expose it in the consumer

2012-11-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13497755#comment-13497755
 ] 

Jun Rao commented on KAFKA-544:
---

+1 on patch v5. For 30, could you add the same constructor comment for 
Partitioner too?


> Retain key in producer and expose it in the consumer
> 
>
> Key: KAFKA-544
> URL: https://issues.apache.org/jira/browse/KAFKA-544
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-544-v1.patch, KAFKA-544-v2.patch, 
> KAFKA-544-v3.patch, KAFKA-544-v4.patch, KAFKA-544-v5.patch
>
>
> KAFKA-506 added support for retaining a key in the messages, however this 
> field is not yet set by the producer.
> The proposal for doing this is to change the producer api to change 
> ProducerData to allow only a single key/value pair so it has a one-to-one 
> mapping to Message. That is change from
>   ProducerData(topic: String, key: K, data: Seq[V])
> to
>   ProducerData(topic: String, key: K, data: V)
> The key itself needs to be encoded. There are several ways this could be 
> handled. A few of the options:
> 1. Change the Encoder and Decoder to be MessageEncoder and MessageDecoder and 
> have them take both a key and value.
> 2. Another option is to change the type of the encoder/decoder to not refer 
> to Message so it could be used for both the key and value.
> I favor the second option but am open to feedback.
> One concern with our current approach to serialization as well as both of 
> these proposals is that they are inefficient. We go from 
> Object=>byte[]=>Message=>MessageSet with a copy at each step. In the case of 
> compression there are a bunch of intermediate steps. We could theoretically 
> clean this up by instead having an interface for the encoder that was 
> something like
>Encoder.writeTo(buffer: ByteBuffer, object: AnyRef)
> and
>Decoder.readFrom(buffer:ByteBuffer): AnyRef
> However there are two problems with this. The first is that we don't actually 
> know the size of the data until  it is serialized so we can't really allocate 
> the bytebuffer properly and might need to resize it. The second is that in 
> the case of compression there is a whole other path to consider. Originally I 
> thought maybe it would be good to try to fix this, but now I think it should 
> be out-of-scope and we should revisit the efficiency issue in a future 
> release in conjunction with our internal handling of compression.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-544) Retain key in producer and expose it in the consumer

2012-11-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13497680#comment-13497680
 ] 

Jun Rao commented on KAFKA-544:
---

Thanks for patch v3. Looks good overall. Some minor comments:

30. Encoder: It seems that we require the constructor of  Encoder and 
Partitioner to take a VerifiableProperty. It would be good if we can add a 
comment on that in the trait.

31. ConsumerConnector: Can we have a version of create,essageStreamsByFilter 
without the decoders?

32. ConsumerFetcherManager: no change is needed.

33. BlockingChannel: logger.debug() should be just debug().

34. ChecksumMessageFormatter: We probably can't remove it since it may be used 
in our tests.


> Retain key in producer and expose it in the consumer
> 
>
> Key: KAFKA-544
> URL: https://issues.apache.org/jira/browse/KAFKA-544
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-544-v1.patch, KAFKA-544-v2.patch, 
> KAFKA-544-v3.patch, KAFKA-544-v4.patch
>
>
> KAFKA-506 added support for retaining a key in the messages, however this 
> field is not yet set by the producer.
> The proposal for doing this is to change the producer api to change 
> ProducerData to allow only a single key/value pair so it has a one-to-one 
> mapping to Message. That is change from
>   ProducerData(topic: String, key: K, data: Seq[V])
> to
>   ProducerData(topic: String, key: K, data: V)
> The key itself needs to be encoded. There are several ways this could be 
> handled. A few of the options:
> 1. Change the Encoder and Decoder to be MessageEncoder and MessageDecoder and 
> have them take both a key and value.
> 2. Another option is to change the type of the encoder/decoder to not refer 
> to Message so it could be used for both the key and value.
> I favor the second option but am open to feedback.
> One concern with our current approach to serialization as well as both of 
> these proposals is that they are inefficient. We go from 
> Object=>byte[]=>Message=>MessageSet with a copy at each step. In the case of 
> compression there are a bunch of intermediate steps. We could theoretically 
> clean this up by instead having an interface for the encoder that was 
> something like
>Encoder.writeTo(buffer: ByteBuffer, object: AnyRef)
> and
>Decoder.readFrom(buffer:ByteBuffer): AnyRef
> However there are two problems with this. The first is that we don't actually 
> know the size of the data until  it is serialized so we can't really allocate 
> the bytebuffer properly and might need to resize it. The second is that in 
> the case of compression there is a whole other path to consider. Originally I 
> thought maybe it would be good to try to fix this, but now I think it should 
> be out-of-scope and we should revisit the efficiency issue in a future 
> release in conjunction with our internal handling of compression.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-14 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-612:
--

Status: Patch Available  (was: Open)

> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-612.patch
>
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-14 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-612:
--

Attachment: kafka-612.patch

Attach a patch. It introduces a separate method for shutting down empty fetcher 
threads and that method will be called at the end of processing a leaderAndIsr 
request. Because of this change, a fetcher thread could have an empty fetch 
map. To avoid sending empty fetch requests to the broker, the patch uses a 
condition to coordinate the fetch. Also fixed a bug when removing items from a 
scala map (can't do the removal while iterating the map since the behaviour is 
not deterministic).

> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-612.patch
>
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-14 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-612:
-

 Summary: move shutting down of fetcher thread out of critical path
 Key: KAFKA-612
 URL: https://issues.apache.org/jira/browse/KAFKA-612
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


Shutting down a fetch thread seems to take more than 200ms since we need to 
interrupt the thread. Currently, we shutdown fetcher threads while processing a 
leaderAndIsr request. This can delay some of the partitions to become a leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-612) move shutting down of fetcher thread out of critical path

2012-11-14 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-612:
-

Assignee: Jun Rao

> move shutting down of fetcher thread out of critical path
> -
>
> Key: KAFKA-612
> URL: https://issues.apache.org/jira/browse/KAFKA-612
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
>
> Shutting down a fetch thread seems to take more than 200ms since we need to 
> interrupt the thread. Currently, we shutdown fetcher threads while processing 
> a leaderAndIsr request. This can delay some of the partitions to become a 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-585) Remove custom metrics jar and replace with latest from metrics HEAD

2012-11-12 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13495462#comment-13495462
 ] 

Jun Rao commented on KAFKA-585:
---

Thanks for patch v1. +1. Just make sure that the basic system tests still work 
before checking in.

> Remove custom metrics jar and replace with latest from metrics HEAD
> ---
>
> Key: KAFKA-585
> URL: https://issues.apache.org/jira/browse/KAFKA-585
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Attachments: KAFKA-585-v1.patch, 
> metrics-annotation-3.0.0-c0c8be71.jar, metrics-core-3.0.0-c0c8be71.jar
>
>
> This is for at least until metrics 3.x is mavenized.
> Also:
> The KafkaCSVMetricsReporter object may be better named as 
> KafkaMetricsReporter since startCSVMetricsReporter
> potentially starts up other (non-CSV) reporters (if any) as well - in which 
> case KafkaMetricsReporter.scala would be a
> better place for it. Or, you can just filter out non-CSV reporters.
> Also, the top-level/config/server.properties need not enable the csv 
> reporter. I thought the system test replication
> suite's server.properties would need to be patched, but it isn't. Should look 
> into whether the test suite picks up the top-level
> config as a template.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-606) System Test Transient Failure (case 0302 GC Pause) - Log segments mismatched across replicas

2012-11-12 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13495387#comment-13495387
 ] 

Jun Rao commented on KAFKA-606:
---

There seems to be 2 issues. (1) Since we only do shallow iteration in 
DumpLogSegments, the offsets may not be consecutive when messages are 
compressed. (2) There is a bug that we cached the offsets as Int. It should be 
long.

> System Test Transient Failure (case 0302 GC Pause) - Log segments mismatched 
> across replicas
> 
>
> Key: KAFKA-606
> URL: https://issues.apache.org/jira/browse/KAFKA-606
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
> Attachments: testcase_0302_data_and_log4j.tar.gz
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-591) Add test cases to test log size retention and more

2012-11-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-591.
-


> Add test cases to test log size retention and more
> --
>
> Key: KAFKA-591
> URL: https://issues.apache.org/jira/browse/KAFKA-591
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: John Fung
> Fix For: 0.8
>
> Attachments: kafka-591-v1.patch, kafka-591-v2.patch
>
>
> Add test cases to test the followings:
> 1. Log Size Retention
> 2. Replica Factor < no. of brokers in a cluster
> 3. Multiple instances of Migration Tool
> 4. Multiple instances of Mirror Maker
> 5. Set "log.index.interval.bytes" to be slightly smaller than message size to 
> force the indexing to be performed for each message

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-591) Add test cases to test log size retention and more

2012-11-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-591:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for patch v2. +1 Committed to 0.8

> Add test cases to test log size retention and more
> --
>
> Key: KAFKA-591
> URL: https://issues.apache.org/jira/browse/KAFKA-591
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: John Fung
> Fix For: 0.8
>
> Attachments: kafka-591-v1.patch, kafka-591-v2.patch
>
>
> Add test cases to test the followings:
> 1. Log Size Retention
> 2. Replica Factor < no. of brokers in a cluster
> 3. Multiple instances of Migration Tool
> 4. Multiple instances of Mirror Maker
> 5. Set "log.index.interval.bytes" to be slightly smaller than message size to 
> force the indexing to be performed for each message

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-604) Add missing metrics in 0.8

2012-11-08 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-604:
-

 Summary: Add missing metrics in 0.8
 Key: KAFKA-604
 URL: https://issues.apache.org/jira/browse/KAFKA-604
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


It would be good if we add the following metrics:

Producer: droppedMessageRate per topic

ReplicaManager: partition count on the broker

FileMessageSet: logFlushTimer per log (i.e., partition). Also, logFlushTime 
should probably be moved to LogSegment since the flush now includes index flush 
time.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-546) Fix commit() in zk consumer for compressed messages

2012-11-07 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-546.
---

   Resolution: Fixed
Fix Version/s: 0.8

Thanks for patch v5. +1. Committed to 0.8.

> Fix commit() in zk consumer for compressed messages
> ---
>
> Key: KAFKA-546
> URL: https://issues.apache.org/jira/browse/KAFKA-546
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Swapnil Ghike
> Fix For: 0.8
>
> Attachments: kafka-546-v1.patch, kafka-546-v2.patch, 
> kafka-546-v3.patch, kafka-546-v4.patch, kafka-546-v5.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in 
> the file. Because it wasn't possible to directly decompress from the middle 
> of a compressed block, messages inside a compressed message set effectively 
> had no offset. As a result the offset given to the consumer was always the 
> offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have 
> offsets. However the server still needs to fetch from the beginning of the 
> compressed messageset (otherwise it can't be decompressed). As a result a 
> commit() which occurs in the middle of a message set will still result in 
> some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than 
> the fetch offset rather than giving them to the consumer. This will make 
> commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-546) Fix commit() in zk consumer for compressed messages

2012-11-07 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-546.
-


> Fix commit() in zk consumer for compressed messages
> ---
>
> Key: KAFKA-546
> URL: https://issues.apache.org/jira/browse/KAFKA-546
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Swapnil Ghike
> Fix For: 0.8
>
> Attachments: kafka-546-v1.patch, kafka-546-v2.patch, 
> kafka-546-v3.patch, kafka-546-v4.patch, kafka-546-v5.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in 
> the file. Because it wasn't possible to directly decompress from the middle 
> of a compressed block, messages inside a compressed message set effectively 
> had no offset. As a result the offset given to the consumer was always the 
> offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have 
> offsets. However the server still needs to fetch from the beginning of the 
> compressed messageset (otherwise it can't be decompressed). As a result a 
> commit() which occurs in the middle of a message set will still result in 
> some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than 
> the fetch offset rather than giving them to the consumer. This will make 
> commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK

2012-11-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492975#comment-13492975
 ] 

Jun Rao commented on KAFKA-574:
---

Thanks for patch v4. Patch looks good and system tests pass for me. Can't see 
the failures that you and Neha are seeing. So, +1 from me.

> KafkaController unnecessarily reads leaderAndIsr info from ZK
> -
>
> Key: KAFKA-574
> URL: https://issues.apache.org/jira/browse/KAFKA-574
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Prashanth Menon
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch, 
> KAFKA-574-v3.patch, KAFKA-574-v4.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is 
> unnecessary since in onBrokerFailure(), we will make leader and isr change 
> anyway so there is no need to first read that information from ZK. Latency is 
> critical in onBrokerFailure() since it determines how quickly a leader can be 
> made online.
> Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() 
> unnecessarily. In this case, the controller does not change the leader or the 
> isr. It just needs to send the current leader and the isr info to the newly 
> started broker. We already cache leader in the controller. Isr in theory 
> could change any time by the leader. So, reading from ZK doesn't guarantee 
> that we can get the latest isr anyway. Instead, we just need to get the isr 
> last selected by the controller (which can be cached together with the leader 
> in the controller). If the leader epoc in a broker is at or larger than the 
> epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, 
> the leader and the isr selected by the controller should be used. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages

2012-11-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492552#comment-13492552
 ] 

Jun Rao commented on KAFKA-546:
---

Thanks for patch v3. A couple of more comments:
30. ConsumerIterator.next(): In the following code, to be safe, we need to 
check if localCurrent hasNext in the while loop.
// reject the messages that have already been consumed
while (item.offset < currentTopicInfo.getConsumeOffset) {
  item = localCurrent.next()

31. ConsumerIteratorTest: Not sure if the test really does what it intends to. 
To simulate reading from the middle of a compressed messageset, we need to put 
in a consume offset larger than 0 in PartitionTopicInfo, right?

> Fix commit() in zk consumer for compressed messages
> ---
>
> Key: KAFKA-546
> URL: https://issues.apache.org/jira/browse/KAFKA-546
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Swapnil Ghike
> Attachments: kafka-546-v1.patch, kafka-546-v2.patch, 
> kafka-546-v3.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in 
> the file. Because it wasn't possible to directly decompress from the middle 
> of a compressed block, messages inside a compressed message set effectively 
> had no offset. As a result the offset given to the consumer was always the 
> offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have 
> offsets. However the server still needs to fetch from the beginning of the 
> compressed messageset (otherwise it can't be decompressed). As a result a 
> commit() which occurs in the middle of a message set will still result in 
> some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than 
> the fetch offset rather than giving them to the consumer. This will make 
> commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK

2012-11-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13491642#comment-13491642
 ] 

Jun Rao commented on KAFKA-574:
---

Thanks for patch v3. Just one more comment:

30. KafkaController.removeReplicaFromIsr(): Shouldn't we only update 
newLeaderAndIsr in the cache if updateSucceeded is true?

20. Ignore my comment on leaderAndIsrIsEmpty. It is fine.

I ran system tests with your patch and they seem to pass. Did you build the 
Kafka jar before running the test?




> KafkaController unnecessarily reads leaderAndIsr info from ZK
> -
>
> Key: KAFKA-574
> URL: https://issues.apache.org/jira/browse/KAFKA-574
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Prashanth Menon
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch, 
> KAFKA-574-v3.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is 
> unnecessary since in onBrokerFailure(), we will make leader and isr change 
> anyway so there is no need to first read that information from ZK. Latency is 
> critical in onBrokerFailure() since it determines how quickly a leader can be 
> made online.
> Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() 
> unnecessarily. In this case, the controller does not change the leader or the 
> isr. It just needs to send the current leader and the isr info to the newly 
> started broker. We already cache leader in the controller. Isr in theory 
> could change any time by the leader. So, reading from ZK doesn't guarantee 
> that we can get the latest isr anyway. Instead, we just need to get the isr 
> last selected by the controller (which can be cached together with the leader 
> in the controller). If the leader epoc in a broker is at or larger than the 
> epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, 
> the leader and the isr selected by the controller should be used. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures

2012-11-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13491566#comment-13491566
 ] 

Jun Rao commented on KAFKA-532:
---

Thanks for patch v4. A few more comments:

40. PartitionStateInfo: It seems that we need to send the controllerEpoc 
associated with this partition. Note that this epoc is different from the 
controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller 
that last changed the leader or isr and will be used when broker updates the 
isr. The latter is the epoc of the controller that sends the request and will 
be used in ReplicaManager to decide which controller's decision to follow. We 
will need to change the controllerEpoc passed to makeLeader and makeFollower in 
ReplicaManager accordingly.

41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be 
better to only update controllerEpoch when it's truly necessary, i.e., the new 
controllerEpoch is larger than the cached one (not equal). This is because 
updating a volatile variable is a bit expensive than updating a local variable 
since the update has to be exposed to other threads.

42. KafkaController: The approach in the new patch works. There are a few 
corner cases that we need to cover.
42.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we 
create the path using the initial epoc version without using conditional 
update. It is possible for 2 controllers to execute this logic simultaneously 
and both get the initial epoc version. One solution is to make sure the 
controller epoc path exists during context initialization. Then we can always 
use conditional update here.
42.2. ControllerContext: We need to initialize controllerEpoc by reading from 
ZK. We also need to make sure that we subscribe to the controllerEpoc path 
first and then read its value from ZK for initialization.
42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK 
version using the value from ZkUtils.readData.

43. ControllerMovedException is missing in the patch

> Multiple controllers can co-exist during soft failures
> --
>
> Key: KAFKA-532
> URL: https://issues.apache.org/jira/browse/KAFKA-532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-532-v1.patch, kafka-532-v2.patch, 
> kafka-532-v3.patch, kafka-532-v4.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> If the current controller experiences an intermittent soft failure (GC pause) 
> in the middle of leader election or partition reassignment, a new controller 
> might get elected and start communicating new state change decisions to the 
> brokers. After recovering from the soft failure, the old controller might 
> continue sending some stale state change decisions to the brokers, resulting 
> in unexpected failures. We need to introduce a controller generation id that 
> increments with controller election. The brokers should reject any state 
> change requests by a controller with an older generation id.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-11-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-577.
---

Resolution: Fixed

Thanks for patch v6. +1. Committed to 0.8 with a minor change: using ::= on a 
list instead of .:+=

> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, 
> kafka_577_v4.diff, kafka_577_v5.diff, kafka_577_v6.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-11-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-577.
-


> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, 
> kafka_577_v4.diff, kafka_577_v5.diff, kafka_577_v6.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-11-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-596.
---

   Resolution: Fixed
Fix Version/s: 0.8

Thanks for patch v3. +1. Committed to 0.8.

> LogSegment.firstAppendTime not reset after truncate to
> --
>
> Key: KAFKA-596
> URL: https://issues.apache.org/jira/browse/KAFKA-596
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-596.patch, kafka-596-v2.patch, kafka-596-v3.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently, we don't reset LogSegment.firstAppendTime after the segment is 
> truncated. What can happen is that we truncate the segment to size 0 and on 
> next append, a new log segment with the same starting offset is rolled 
> because the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-11-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-596.
-


> LogSegment.firstAppendTime not reset after truncate to
> --
>
> Key: KAFKA-596
> URL: https://issues.apache.org/jira/browse/KAFKA-596
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-596.patch, kafka-596-v2.patch, kafka-596-v3.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently, we don't reset LogSegment.firstAppendTime after the segment is 
> truncated. What can happen is that we truncate the segment to size 0 and on 
> next append, a new log segment with the same starting offset is rolled 
> because the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages

2012-11-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490977#comment-13490977
 ] 

Jun Rao commented on KAFKA-546:
---

Thanks for patch v2. Some comments:

20. ConsumerIterator.next(): The following code depends on no gaps in offsets. 
This is true at this moment, but may not be true in the future when we have a 
different retention policy. A safer way is to keep iterating the messageSet 
until we get an offset that reaches or passes ctiConsumeOffset.
for (i <- 0L until (ctiConsumeOffset - cdcFetchOffset)) {
  localCurrent.next()

21. PartitionTopicInfo: In startOffset(), unfortunately, we can't use the 
shallow iterator. This is because when messages are compressed, the offset of 
the top level message has the offset of the last message (instead of the first 
one) in the compressed unit. Also, iterating messages here may not be ideal 
since it forces us to decompress. An alternative way is to do the logic in 
ConsumerIterator.next(). Everytime that we get a new chunk of messageset, we 
keep iterating it until the message offset reaches or passes the 
consumeroffset. This way, if we are doing shallow iteration, we don't have to 
decompress messages.

22. ConsumerIteratorTest: 
22.1 zkConsumerConnector is not used.
22.2 We probably should set consumerOffset to a value >0 in PartitionTopicInfo.
22.3 Also, could we add a test that covers compressed messageset?


> Fix commit() in zk consumer for compressed messages
> ---
>
> Key: KAFKA-546
> URL: https://issues.apache.org/jira/browse/KAFKA-546
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Swapnil Ghike
> Attachments: kafka-546-v1.patch, kafka-546-v2.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in 
> the file. Because it wasn't possible to directly decompress from the middle 
> of a compressed block, messages inside a compressed message set effectively 
> had no offset. As a result the offset given to the consumer was always the 
> offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have 
> offsets. However the server still needs to fetch from the beginning of the 
> compressed messageset (otherwise it can't be decompressed). As a result a 
> commit() which occurs in the middle of a message set will still result in 
> some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than 
> the fetch offset rather than giving them to the consumer. This will make 
> commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty

2012-11-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490770#comment-13490770
 ] 

Jun Rao commented on KAFKA-593:
---

Thanks for patch v3. Looks good. Some minor comments.

30. Log.rollToOffset():  
segmentsView.last.index.file.getName.split("\\.")(0).toLong can just be 
segmentsView.last.start.

31. Log.loadSegments(): Just to be consistent. Should we use index.maxIndexSize 
instead of maxIndexSize in the following statement?
  logSegments.get(logSegments.size() - 1).index.resetSizeTo(maxIndexSize)


> Empty log index file created when it shouldn't be empty
> ---
>
> Key: KAFKA-593
> URL: https://issues.apache.org/jira/browse/KAFKA-593
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yang Ye
> Attachments: kafka_583_zk_kafka_data.tar.gz, kafka_593_v1.diff, 
> kafka_593_v2.diff, kafka_593_v3.diff
>
>
> We have met empty index file during system test when it shouldn't be empty. 
> In this case, there're around 100 messages in each segment, each of size 
> around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at 
> least 2 log index entries, but we see empty index file. The kafka and 
> zookeeper logs are attached
> [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/
> [yye@yye-ld test_1-2]$ ls -l
> total 84
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index
> -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index
> -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index
> -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index
> -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index
> -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index
> -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log
> -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index
> -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-546) Fix commit() in zk consumer for compressed messages

2012-11-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489481#comment-13489481
 ] 

Jun Rao commented on KAFKA-546:
---

Can't see to apply the patch cleanly to 0.8. Could you rebase?

$ patch -p0 < ~/Downloads/kafka-546-v1.patch 
patching file core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
patching file core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Reversed (or previously applied) patch detected!  Assume -R? [n] ^C


> Fix commit() in zk consumer for compressed messages
> ---
>
> Key: KAFKA-546
> URL: https://issues.apache.org/jira/browse/KAFKA-546
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Swapnil Ghike
> Attachments: kafka-546-v1.patch
>
>
> In 0.7.x and earlier versions offsets were assigned by the byte location in 
> the file. Because it wasn't possible to directly decompress from the middle 
> of a compressed block, messages inside a compressed message set effectively 
> had no offset. As a result the offset given to the consumer was always the 
> offset of the wrapper message set.
> In 0.8 after the logical offsets patch messages in a compressed set do have 
> offsets. However the server still needs to fetch from the beginning of the 
> compressed messageset (otherwise it can't be decompressed). As a result a 
> commit() which occurs in the middle of a message set will still result in 
> some duplicates.
> This can be fixed in the ConsumerIterator by discarding messages smaller than 
> the fetch offset rather than giving them to the consumer. This will make 
> commit work correctly in the presence of compressed messages (finally).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-11-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489473#comment-13489473
 ] 

Jun Rao commented on KAFKA-596:
---

I agree that setting firstAppendTime to None in Logsement.truncateTo() is 
necessary. However, I don't think this is necessary in Log.maybeRoll() and 
Log.markedDeletedWhile(). In both cases, we are not changing the log segment. 
So whoever changed the segment last to make its size 0 (either through 
truncation or creation) would have set firstAppendTime properly. Note that 
maybeRoll is called on every log append. We don't want to add unnecessary 
overhead.

> LogSegment.firstAppendTime not reset after truncate to
> --
>
> Key: KAFKA-596
> URL: https://issues.apache.org/jira/browse/KAFKA-596
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>  Labels: bugs
> Attachments: kafka-596.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently, we don't reset LogSegment.firstAppendTime after the segment is 
> truncated. What can happen is that we truncate the segment to size 0 and on 
> next append, a new log segment with the same starting offset is rolled 
> because the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Maintainer system?

2012-11-01 Thread Jun Rao
I think that's a good idea. It will be good to have at least 2 maintainers
per component.

I'd encourage more people to review patches. The more patches one reviews,
the more familiar he/she is with the components.

Thanks,

Jun

On Thu, Oct 4, 2012 at 1:13 PM, Jay Kreps  wrote:

> Hey guys,
>
> The number of developers and code base size for Kafka is getting larger.
>
> One way to help scale things gracefully would be to have an official idea
> of "subsystems" and have official maintainers for those. The duties of a
> maintainer would be
> 1. Be the final word on API design in that area
> 2. Ensure sufficient documentation and test coverage for that subsystem
> 3. Review all code changes in that sub-system area
> 4. Ensure that patches in that area get applied in a timely fashion
>
> In particular I think we could do a better job of getting patches in in a
> timely manner.
>
> Here are what I see as logically distinct systems or areas:
>
>- Producer (java and scala)
>- Consumer (java and scala)
>- Network layer (kafka.network.*)
>- Log (kafka.log.*)
>- Replication (controller, fetcher threads, hw mark stuff, etc)
>- Kafka API impl (basically just KafkaApi.scala)
>- Hadoop stuff
>- Perf tools and system tests
>- Misc other small things: metrics, utils, etc.
>
> Obviously many features will cut across these layers, but the idea is that
> by having a real owner that is responsible for that area we will get higher
> quality.
>
> I think we are doing this informally already, but making it formal would
> help ensure you knew the right people to get input from. I think it
> probably wouldn't make sense to start doing this until post-0.8 since we
> are in the middle of so many things right now, but I wanted to see what
> people thought...?
>
> -Jay
>


[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489266#comment-13489266
 ] 

Jun Rao commented on KAFKA-596:
---

Thanks for the patch. A couple of comments:

1.  Log.maybeRoll(): Are the following lines needed since we are not creating a 
new segment?
  if (segment.messageSet.sizeInBytes == 0)
segment.firstAppendTime = None

2. Log.markedDeletedWhile(): Is the following line needed since we are not 
creating a new segment?
  view(numToDelete - 1).firstAppendTime = None



> LogSegment.firstAppendTime not reset after truncate to
> --
>
> Key: KAFKA-596
> URL: https://issues.apache.org/jira/browse/KAFKA-596
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>  Labels: bugs
> Attachments: kafka-596.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently, we don't reset LogSegment.firstAppendTime after the segment is 
> truncated. What can happen is that we truncate the segment to size 0 and on 
> next append, a new log segment with the same starting offset is rolled 
> because the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489260#comment-13489260
 ] 

Jun Rao commented on KAFKA-593:
---

Thanks for the patch. A couple of comments:

1. Log.rollToOffset(): We just need to verify that the starting offset of the 
last segment doesn't equal to the new offset.

2. OffsetIndex: When loading the log segment on broker startup, it is possible 
to have a segment with empty data and empty index. So, we will hit the same 
issue of rolling a segment with a duplicated name. We probably should extend 
the index size to max index size in the constructor of OffsetIndex.

> Empty log index file created when it shouldn't be empty
> ---
>
> Key: KAFKA-593
> URL: https://issues.apache.org/jira/browse/KAFKA-593
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yang Ye
> Attachments: kafka_583_zk_kafka_data.tar.gz, kafka_593_v1.diff
>
>
> We have met empty index file during system test when it shouldn't be empty. 
> In this case, there're around 100 messages in each segment, each of size 
> around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at 
> least 2 log index entries, but we see empty index file. The kafka and 
> zookeeper logs are attached
> [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/
> [yye@yye-ld test_1-2]$ ls -l
> total 84
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index
> -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index
> -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index
> -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index
> -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index
> -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index
> -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log
> -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index
> -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489255#comment-13489255
 ] 

Jun Rao commented on KAFKA-577:
---

A couple of more comments for patch v4.

40. LinkedList is supposed to be used for manipulating links directly. We 
should just use List (immutable) instead. Also, instead of the following,

misMatchesSeq = misMatchesSeq.:+((entry.offset + index.baseOffset, 
messageAndOffset.offset).asInstanceOf[(Int, Int)])

we should write

misMatchesSeq :+= ((entry.offset + index.baseOffset, 
messageAndOffset.offset).asInstanceOf[(Int, Int)])


> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff, kafka_577_v3.diff, 
> kafka_577_v4.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-491) KafkaRequestHandler needs to handle exceptions

2012-11-01 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-491.
-


> KafkaRequestHandler needs to handle exceptions
> --
>
> Key: KAFKA-491
> URL: https://issues.apache.org/jira/browse/KAFKA-491
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>    Reporter: Jun Rao
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-491-v1.diff, kafka-491-v2.diff
>
>
> Currently, if apis.handle() throws an exception (e.g., if the broker receives 
> an invalid request), KafkaRequestHandler will die. We need to handle 
> exceptions properly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-491) KafkaRequestHandler needs to handle exceptions

2012-11-01 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-491.
---

Resolution: Fixed

Thanks for patch v2. +1. Committed to 0.8.

> KafkaRequestHandler needs to handle exceptions
> --
>
> Key: KAFKA-491
> URL: https://issues.apache.org/jira/browse/KAFKA-491
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>    Reporter: Jun Rao
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-491-v1.diff, kafka-491-v2.diff
>
>
> Currently, if apis.handle() throws an exception (e.g., if the broker receives 
> an invalid request), KafkaRequestHandler will die. We need to handle 
> exceptions properly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-188) Support multiple data directories

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13489184#comment-13489184
 ] 

Jun Rao commented on KAFKA-188:
---

+1 on patch v8. Thanks,

> Support multiple data directories
> -
>
> Key: KAFKA-188
> URL: https://issues.apache.org/jira/browse/KAFKA-188
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
> Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, 
> KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch, 
> KAFKA-188-v7.patch, KAFKA-188-v8.patch
>
>
> Currently we allow only a single data directory. This means that a multi-disk 
> configuration needs to be a RAID array or LVM volume or something like that 
> to be mounted as a single directory.
> For a high-throughput low-reliability configuration this would mean RAID0 
> striping. Common wisdom in Hadoop land has it that a JBOD setup that just 
> mounts each disk as a separate directory and does application-level balancing 
> over these results in about 30% write-improvement. For example see this claim 
> here:
>   http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html
> It is not clear to me why this would be the case--it seems the RAID 
> controller should be able to balance writes as well as the application so it 
> may depend on the details of the setup.
> Nonetheless this would be really easy to implement, all you need to do is add 
> multiple data directories and balance partition creation over these disks.
> One problem this might cause is if a particular topic is much larger than the 
> others it might unbalance the load across the disks. The partition->disk 
> assignment policy should probably attempt to evenly spread each topic to 
> avoid this, rather than just trying keep the number of partitions balanced 
> between disks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13488893#comment-13488893
 ] 

Jun Rao commented on KAFKA-532:
---

Thanks for patch v3. The overall approach seems to work. Some comments:

30. PartitionStateInfo and LeaderAndIsrRequest: When calculating size in 
sizeInBytes(), it's more readable if we put each number to be added in a 
separate line.

31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader 
should use when updating the leaderAndIsr path. There is probably nothing wrong 
to use the controllerEpoch in replicaManager. However, it seems to make more 
sense to use the controllerEpoch in the leaderAndIsr path itself, since this 
update is actually not made by the controller.

32. ReplicaManager.controllerEpoch: Since this variable can be accessed from 
different threads, it needs to be a volatile. Also, we only need to update 
controllerEpoch if the one from the request is larger (but not equal). It 
probably should be initialized to 0 or -1?

33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder 
if we really need to start a new broker. Can we just send a stale controller 
epoc using the controllerChannelManager in the current controller?

34. KafkaController: There seems to be a tricky issue with incrementing the 
controller epoc. We increment epoc in onControllerFailover() after the broker 
becomes a controller. What could happen is that broker 1 becomes the controller 
and goes to GC before we increment the epoc. Broker 2 becomes the new 
controller and increments the epoc. Broker 1 comes back from gc and increments 
epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's 
the best way to address this. One thought is that immediately after controller 
epoc is incremented in onControllerFailover(), we check if this broker is still 
the controller (by reading the controller path in ZK). If not, we throw an 
exception. Also, epoc probably should be initialized to 0 if we want the first 
controller to have epoc 1.

35. We use int to represent both controller and leader epoc. There is the 
potential issue if the number wraps. We probably don't need to worry about it 
now.




> Multiple controllers can co-exist during soft failures
> --
>
> Key: KAFKA-532
> URL: https://issues.apache.org/jira/browse/KAFKA-532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-532-v1.patch, kafka-532-v2.patch, 
> kafka-532-v3.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> If the current controller experiences an intermittent soft failure (GC pause) 
> in the middle of leader election or partition reassignment, a new controller 
> might get elected and start communicating new state change decisions to the 
> brokers. After recovering from the soft failure, the old controller might 
> continue sending some stale state change decisions to the brokers, resulting 
> in unexpected failures. We need to introduce a controller generation id that 
> increments with controller election. The brokers should reject any state 
> change requests by a controller with an older generation id.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK

2012-11-01 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13488810#comment-13488810
 ] 

Jun Rao commented on KAFKA-574:
---

Thanks for patch v2. Just one more comment:

20. ReplicaSatemachine.handleStateChange(): In the OfflineReplica state, after 
the isr is updated, we need to update the leaderAndIsr cache in controller 
context. Also, is leaderAndIsrIsEmpty better than leaderAndIsrOpt?

Could you run the basic system tests and make sure that they pass?

/system_test/ $ python –u –B system_test_runner.py 2>&1 | tee 
system_test_output_`date +%s`.log

> KafkaController unnecessarily reads leaderAndIsr info from ZK
> -
>
> Key: KAFKA-574
> URL: https://issues.apache.org/jira/browse/KAFKA-574
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Prashanth Menon
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-574-v1.patch, KAFKA-574-v2.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is 
> unnecessary since in onBrokerFailure(), we will make leader and isr change 
> anyway so there is no need to first read that information from ZK. Latency is 
> critical in onBrokerFailure() since it determines how quickly a leader can be 
> made online.
> Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() 
> unnecessarily. In this case, the controller does not change the leader or the 
> isr. It just needs to send the current leader and the isr info to the newly 
> started broker. We already cache leader in the controller. Isr in theory 
> could change any time by the leader. So, reading from ZK doesn't guarantee 
> that we can get the latest isr anyway. Instead, we just need to get the isr 
> last selected by the controller (which can be cached together with the leader 
> in the controller). If the leader epoc in a broker is at or larger than the 
> epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, 
> the leader and the isr selected by the controller should be used. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-598) decouple fetch size from max message size

2012-11-01 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-598:
-

 Summary: decouple fetch size from max message size
 Key: KAFKA-598
 URL: https://issues.apache.org/jira/browse/KAFKA-598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


Currently, a consumer has to set fetch size larger than the max message size. 
This increases the memory footprint on the consumer, especially when a large 
number of topic/partition is subscribed. By decoupling the fetch size from max 
message size, we can use a smaller fetch size for normal consumption and when 
hitting a large message (hopefully rare), we automatically increase fetch size 
to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-10-31 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487947#comment-13487947
 ] 

Jun Rao commented on KAFKA-596:
---

The fix is to set LogSegment.firstAppendTime to none if we truncate the segment 
to size 0. However, this brings up the deeper question of how do we prevent 
segments with identical starting offset from being created during log roll? 
Maybe, we should add a check in log.roll to guard this.

> LogSegment.firstAppendTime not reset after truncate to
> --
>
> Key: KAFKA-596
> URL: https://issues.apache.org/jira/browse/KAFKA-596
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: bugs
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently, we don't reset LogSegment.firstAppendTime after the segment is 
> truncated. What can happen is that we truncate the segment to size 0 and on 
> next append, a new log segment with the same starting offset is rolled 
> because the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-596) LogSegment.firstAppendTime not reset after truncate to

2012-10-31 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-596:
-

 Summary: LogSegment.firstAppendTime not reset after truncate to
 Key: KAFKA-596
 URL: https://issues.apache.org/jira/browse/KAFKA-596
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao


Currently, we don't reset LogSegment.firstAppendTime after the segment is 
truncated. What can happen is that we truncate the segment to size 0 and on 
next append, a new log segment with the same starting offset is rolled because 
the time-based rolling is triggered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-586) system test configs are broken

2012-10-31 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-586:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1. Committed to 0.8.

> system test configs are broken
> --
>
> Key: KAFKA-586
> URL: https://issues.apache.org/jira/browse/KAFKA-586
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: John Fung
>Priority: Critical
>  Labels: replication-testing
> Fix For: 0.8
>
> Attachments: kafka-586-v1.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> system test suite has a set of default config values that are picked up from 
> the testsuite/config directory. One can override the value of a config in the 
> testcase_properties.json file. This is great, but the assumption is that the 
> config property that is being overridden should also present in the 
> testsuite/config/*.properties file. 
> Currently, there are a number of properties in KafkaConfig that are not in 
> the testsuite/config/*.properties file. So the tests might intend to override 
> some properties, but that will be ignored. 
> Let's either add all the configs in the testsuite/config/*.properties file or 
> remove this depedency and override the property specified in 
> testcase_properties.json.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-586) system test configs are broken

2012-10-31 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-586.
-


> system test configs are broken
> --
>
> Key: KAFKA-586
> URL: https://issues.apache.org/jira/browse/KAFKA-586
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: John Fung
>Priority: Critical
>  Labels: replication-testing
> Fix For: 0.8
>
> Attachments: kafka-586-v1.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> system test suite has a set of default config values that are picked up from 
> the testsuite/config directory. One can override the value of a config in the 
> testcase_properties.json file. This is great, but the assumption is that the 
> config property that is being overridden should also present in the 
> testsuite/config/*.properties file. 
> Currently, there are a number of properties in KafkaConfig that are not in 
> the testsuite/config/*.properties file. So the tests might intend to override 
> some properties, but that will be ignored. 
> Let's either add all the configs in the testsuite/config/*.properties file or 
> remove this depedency and override the property specified in 
> testcase_properties.json.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-594) Update System Test due to new argument "--sync" in ProducerPerformance

2012-10-31 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-594.
-


> Update System Test due to new argument "--sync" in ProducerPerformance
> --
>
> Key: KAFKA-594
> URL: https://issues.apache.org/jira/browse/KAFKA-594
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
>Assignee: John Fung
> Fix For: 0.8
>
> Attachments: kafka-594-v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-594) Update System Test due to new argument "--sync" in ProducerPerformance

2012-10-31 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-594:
--

   Resolution: Fixed
Fix Version/s: 0.8
 Assignee: John Fung
   Status: Resolved  (was: Patch Available)

Thanks for the patch. Committed to 0.8.

> Update System Test due to new argument "--sync" in ProducerPerformance
> --
>
> Key: KAFKA-594
> URL: https://issues.apache.org/jira/browse/KAFKA-594
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
>Assignee: John Fung
> Fix For: 0.8
>
> Attachments: kafka-594-v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty

2012-10-31 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487841#comment-13487841
 ] 

Jun Rao commented on KAFKA-593:
---

Here is the issue. We rolled a new segment in the follower. The follower in one 
fetch gets 10k bytes of data and appends to its log. This won't add any index 
entry since it's the very first append to this segment. After the append, the 
log rolled since the max segment size is reached. This leaves an empty index.

Technically, the logic is still correct. It does mean that index entries may 
not be generated as frequently as one expects, depending on the fetch size used 
in the follower fetcher thread and how far behind a follower is. This may 
impact consumer performance a bit.

> Empty log index file created when it shouldn't be empty
> ---
>
> Key: KAFKA-593
> URL: https://issues.apache.org/jira/browse/KAFKA-593
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yang Ye
> Attachments: kafka_583_zk_kafka_data.tar.gz
>
>
> We have met empty index file during system test when it shouldn't be empty. 
> In this case, there're around 100 messages in each segment, each of size 
> around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at 
> least 2 log index entries, but we see empty index file. The kafka and 
> zookeeper logs are attached
> [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/
> [yye@yye-ld test_1-2]$ ls -l
> total 84
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index
> -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index
> -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index
> -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index
> -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index
> -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index
> -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log
> -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index
> -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-188) Support multiple data directories

2012-10-30 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13487307#comment-13487307
 ] 

Jun Rao commented on KAFKA-188:
---

Now, some unit tests fail with exceptions like the following. Most of them seem 
to be transient, but they show up more frequently now.

[error] Test Failed: 
testCleanShutdown(kafka.server.ServerShutdownTest)
kafka.common.KafkaException: Failed to acquire lock on file .lock in 
/tmp/kafka-246675. A Kafka instance in another process or thread is using this 
directory.


> Support multiple data directories
> -
>
> Key: KAFKA-188
> URL: https://issues.apache.org/jira/browse/KAFKA-188
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
> Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, 
> KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch, KAFKA-188-v7.patch
>
>
> Currently we allow only a single data directory. This means that a multi-disk 
> configuration needs to be a RAID array or LVM volume or something like that 
> to be mounted as a single directory.
> For a high-throughput low-reliability configuration this would mean RAID0 
> striping. Common wisdom in Hadoop land has it that a JBOD setup that just 
> mounts each disk as a separate directory and does application-level balancing 
> over these results in about 30% write-improvement. For example see this claim 
> here:
>   http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html
> It is not clear to me why this would be the case--it seems the RAID 
> controller should be able to balance writes as well as the application so it 
> may depend on the details of the setup.
> Nonetheless this would be really easy to implement, all you need to do is add 
> multiple data directories and balance partition creation over these disks.
> One problem this might cause is if a particular topic is much larger than the 
> others it might unbalance the load across the disks. The partition->disk 
> assignment policy should probably attempt to evenly spread each topic to 
> avoid this, rather than just trying keep the number of partitions balanced 
> between disks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-593) Empty log index file created when it shouldn't be empty

2012-10-30 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486990#comment-13486990
 ] 

Jun Rao commented on KAFKA-593:
---

Producer used sync mode. So, there is 1 message per batch and each segment has 
about 100 messages. I expect at least 2 index entries being added per segment.

> Empty log index file created when it shouldn't be empty
> ---
>
> Key: KAFKA-593
> URL: https://issues.apache.org/jira/browse/KAFKA-593
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yang Ye
> Attachments: kafka_583_zk_kafka_data.tar.gz
>
>
> We have met empty index file during system test when it shouldn't be empty. 
> In this case, there're around 100 messages in each segment, each of size 
> around 100 bytes, given the "logIndexIntervalBytes" 4096, there should be at 
> least 2 log index entries, but we see empty index file. The kafka and 
> zookeeper logs are attached
> [yye@yye-ld kafka_server_3_logs]$ cd test_1-2/
> [yye@yye-ld test_1-2]$ ls -l
> total 84
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 .index
> -rw-r--r-- 1 yye eng10248 Oct 29 15:22 .log
> -rw-r--r-- 1 yye eng8 Oct 29 15:22 0100.index
> -rw-r--r-- 1 yye eng10296 Oct 29 15:22 0100.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0200.index
> -rw-r--r-- 1 yye eng10293 Oct 29 15:23 0200.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0300.index
> -rw-r--r-- 1 yye eng10274 Oct 29 15:23 0300.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0399.index
> -rw-r--r-- 1 yye eng10276 Oct 29 15:23 0399.log
> -rw-r--r-- 1 yye eng0 Oct 29 15:23 0498.index
> -rw-r--r-- 1 yye eng10256 Oct 29 15:23 0498.log
> -rw-r--r-- 1 yye eng 10485760 Oct 29 15:23 0596.index
> -rw-r--r-- 1 yye eng 3564 Oct 29 15:23 0596.log

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-188) Support multiple data directories

2012-10-30 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486921#comment-13486921
 ] 

Jun Rao commented on KAFKA-188:
---

Our system tests fail with the latest patch. 

python -B system_test_runner.py 2>&1 | tee test.out

Saw the following in broker log.

[2012-10-30 07:40:19,682] FATAL Fatal error during KafkaServerStable startup. 
Prepare to shutdown (kafka.server.KafkaServerStartable)
java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:883)
at kafka.utils.FileLock.(FileLock.scala:12)
at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64)
at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at kafka.log.LogManager.(LogManager.scala:64)
at kafka.server.KafkaServer.startup(KafkaServer.scala:60)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2012-10-30 07:40:19,683] INFO [Kafka Server 1], shutting down 
(kafka.server.KafkaServer)


> Support multiple data directories
> -
>
> Key: KAFKA-188
> URL: https://issues.apache.org/jira/browse/KAFKA-188
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
> Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, 
> KAFKA-188-v4.patch, KAFKA-188-v5.patch, KAFKA-188-v6.patch
>
>
> Currently we allow only a single data directory. This means that a multi-disk 
> configuration needs to be a RAID array or LVM volume or something like that 
> to be mounted as a single directory.
> For a high-throughput low-reliability configuration this would mean RAID0 
> striping. Common wisdom in Hadoop land has it that a JBOD setup that just 
> mounts each disk as a separate directory and does application-level balancing 
> over these results in about 30% write-improvement. For example see this claim 
> here:
>   http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html
> It is not clear to me why this would be the case--it seems the RAID 
> controller should be able to balance writes as well as the application so it 
> may depend on the details of the setup.
> Nonetheless this would be really easy to implement, all you need to do is add 
> multiple data directories and balance partition creation over these disks.
> One problem this might cause is if a particular topic is much larger than the 
> others it might unbalance the load across the disks. The partition->disk 
> assignment policy should probably attempt to evenly spread each topic to 
> avoid this, rather than just trying keep the number of partitions balanced 
> between disks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-592) Register metrics beans at kafka server startup

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-592.
-


> Register metrics beans at kafka server startup 
> ---
>
> Key: KAFKA-592
> URL: https://issues.apache.org/jira/browse/KAFKA-592
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-592-v1.patch
>
>
> jmx beans are not registered until the corresponding part of the code 
> executes. To set alerts on some of the server side beans, they need to be 
> registered at server startup.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-592) Register metrics beans at kafka server startup

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-592.
---

Resolution: Fixed

Thanks for the patch. +1. Committed to 0.8 by making registerStats private and 
adding a comment.

> Register metrics beans at kafka server startup 
> ---
>
> Key: KAFKA-592
> URL: https://issues.apache.org/jira/browse/KAFKA-592
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-592-v1.patch
>
>
> jmx beans are not registered until the corresponding part of the code 
> executes. To set alerts on some of the server side beans, they need to be 
> registered at server startup.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Reopened] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reopened KAFKA-577:
---


> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-575) Partition.makeFollower() reads broker info from ZK

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-575.
---

   Resolution: Fixed
Fix Version/s: 0.8

Thanks for patch v3. +1. Committed to 0.8.

> Partition.makeFollower() reads broker info from ZK
> --
>
> Key: KAFKA-575
> URL: https://issues.apache.org/jira/browse/KAFKA-575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-575-v1.patch, kafka-575-v2.patch, 
> kafka-575-v3-correct.patch, kafka-575-v3.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> To follow a new leader, Partition.makeFollower() has to obtain the broker 
> info of the new leader. Currently, it reads that info from ZK for every 
> affected partition. This increases the time for a leader to truly available. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-575) Partition.makeFollower() reads broker info from ZK

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-575.
-


> Partition.makeFollower() reads broker info from ZK
> --
>
> Key: KAFKA-575
> URL: https://issues.apache.org/jira/browse/KAFKA-575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-575-v1.patch, kafka-575-v2.patch, 
> kafka-575-v3-correct.patch, kafka-575-v3.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> To follow a new leader, Partition.makeFollower() has to obtain the broker 
> info of the new leader. Currently, it reads that info from ZK for every 
> affected partition. This increases the time for a leader to truly available. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly

2012-10-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486567#comment-13486567
 ] 

Jun Rao commented on KAFKA-588:
---

Thanks for the patch. In LogSegmentTest, could we set indexIntervalSize such 
that we force an index entry to be created for every message?

> Index truncation doesn't seem to remove the last entry properly
> ---
>
> Key: KAFKA-588
> URL: https://issues.apache.org/jira/browse/KAFKA-588
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-588.patch
>
>
> [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment 
> /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset 
> 429050 (kafka.log.
> Log)
> [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding 
> fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with 
> fetcherId 0 (kafka.server.R
> eplicaFetcherManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader 
> and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> 
> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), 
> (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" 
> },3))) (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 1 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 0 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error 
> due to  (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempt to append an offset (429050) no 
> larger than the last offset appended (429050).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:180)
> at kafka.log.LogSegment.append(LogSegment.scala:56)
> at kafka.log.Log.append(Log.scala:273)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-10-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486548#comment-13486548
 ] 

Jun Rao commented on KAFKA-577:
---

Thanks for patch v2. Instead of exiting on first verification failure, it would 
be better if we dump the whole log and report all failed verification at the 
very end.

> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-267:
--

   Resolution: Fixed
Fix Version/s: 0.8
   Status: Resolved  (was: Patch Available)

+1 on patch v6. Committed to 0.8. 

> Enhance ProducerPerformance to generate unique random Long value for payload
> 
>
> Key: KAFKA-267
> URL: https://issues.apache.org/jira/browse/KAFKA-267
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Fung
>Assignee: Yang Ye
> Fix For: 0.8
>
> Attachments: kafka-267-v1.patch, kafka-267-v2.patch, 
> kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, 
> kafka-267-v6.patch, kafka-267-v7.patch
>
>
> This is achieved by:
> 1. Adding a new class UniqueRandom to shuffle a range of numbers.
> 2. An optional new argument "start-index" is added to specify the starting 
> number of the range to be shuffled. If this argument is omitted, it is 
> defaulted to 1. So it is backward compatible with the argument options.
> 3. The ending number of the range is the starting number + number of messages 
> - 1.
> Other ProducerPerformance advancement: 
> 1. producing to multiple topics
> 2. supporting multiple instances of producer performance ( and distinguishes 
> them)
> 3. allowing waiting some time after sending a request

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-577:
--

Comment: was deleted

(was: +1 on patch v6. Committed to 0.8.)

> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-10-29 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-577.
---

   Resolution: Fixed
Fix Version/s: 0.8

+1 on patch v6. Committed to 0.8.

> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Fix For: 0.8
>
> Attachments: kafka_577_v1.diff, kafka_577_v2.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-188) Support multiple data directories

2012-10-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486145#comment-13486145
 ] 

Jun Rao commented on KAFKA-188:
---

Thanks for patch v5. Some more comments:

50. LogManager.nextLogDir(): zeros should only include dirs not already used, 
right? Currently, it seems to include all log dirs.

51. ReplicaManager.checkpointHighWatermarks(): When handling a leaderAndIsr 
request, we first create a partition and then create a local replica (which 
creates the local log). So, there is a slight possibility that a partition in 
allPartitions may not have a local log. The simplest way is to ignore such 
partition when checkpointing HW.

52. VerifiableProperties:  The following constructor doesn't seem to be used.
def this() = this(new Properties)


> Support multiple data directories
> -
>
> Key: KAFKA-188
> URL: https://issues.apache.org/jira/browse/KAFKA-188
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
> Attachments: KAFKA-188.patch, KAFKA-188-v2.patch, KAFKA-188-v3.patch, 
> KAFKA-188-v4.patch, KAFKA-188-v5.patch
>
>
> Currently we allow only a single data directory. This means that a multi-disk 
> configuration needs to be a RAID array or LVM volume or something like that 
> to be mounted as a single directory.
> For a high-throughput low-reliability configuration this would mean RAID0 
> striping. Common wisdom in Hadoop land has it that a JBOD setup that just 
> mounts each disk as a separate directory and does application-level balancing 
> over these results in about 30% write-improvement. For example see this claim 
> here:
>   http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html
> It is not clear to me why this would be the case--it seems the RAID 
> controller should be able to balance writes as well as the application so it 
> may depend on the details of the setup.
> Nonetheless this would be really easy to implement, all you need to do is add 
> multiple data directories and balance partition creation over these disks.
> One problem this might cause is if a particular topic is much larger than the 
> others it might unbalance the load across the disks. The partition->disk 
> assignment policy should probably attempt to evenly spread each topic to 
> avoid this, rather than just trying keep the number of partitions balanced 
> between disks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-574) KafkaController unnecessarily reads leaderAndIsr info from ZK

2012-10-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486091#comment-13486091
 ] 

Jun Rao commented on KAFKA-574:
---

Thanks for the patch. Some comments:

1. ReplicaStateMachine.handleStateChange():  There is one more optimization to 
consider. When handling the OnlineReplica case, we don't really need to read 
Isr from ZK and can read it from in-memory cache. To do that, we can extend 
ControllerContext.allLeaders to store LeaderAndIsr, instead of just the broker 
Id of the leader. This leaderAndIsr cache will be updated every time the 
controller makes a leader change.

2. 0.8 has moved since you uploaded the patch. Could you rebase?



> KafkaController unnecessarily reads leaderAndIsr info from ZK
> -
>
> Key: KAFKA-574
> URL: https://issues.apache.org/jira/browse/KAFKA-574
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Prashanth Menon
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-574-v1.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> KafkaController calls updateLeaderAndIsrCache() in onBrokerFailure(). This is 
> unnecessary since in onBrokerFailure(), we will make leader and isr change 
> anyway so there is no need to first read that information from ZK. Latency is 
> critical in onBrokerFailure() since it determines how quickly a leader can be 
> made online.
> Similarly, updateLeaderAndIsrCache() is called in onBrokerStartup() 
> unnecessarily. In this case, the controller does not change the leader or the 
> isr. It just needs to send the current leader and the isr info to the newly 
> started broker. We already cache leader in the controller. Isr in theory 
> could change any time by the leader. So, reading from ZK doesn't guarantee 
> that we can get the latest isr anyway. Instead, we just need to get the isr 
> last selected by the controller (which can be cached together with the leader 
> in the controller). If the leader epoc in a broker is at or larger than the 
> epoc in the leaderAndIsr request, the broker can just ignore it. Otherwise, 
> the leader and the isr selected by the controller should be used. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-575) Partition.makeFollower() reads broker info from ZK

2012-10-29 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486079#comment-13486079
 ] 

Jun Rao commented on KAFKA-575:
---

Patch looks good. Just one comment:

1. ControllerBrokerRequestBatch.sendRequestsToBrokers(): Instead of including 
all live brokers in the leaderAndIsr request, we probably can just include 
brokers that are leaders in each LeaderAndIsr request.

> Partition.makeFollower() reads broker info from ZK
> --
>
> Key: KAFKA-575
> URL: https://issues.apache.org/jira/browse/KAFKA-575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-575-v1.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> To follow a new leader, Partition.makeFollower() has to obtain the broker 
> info of the new leader. Currently, it reads that info from ZK for every 
> affected partition. This increases the time for a leader to truly available. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure

2012-10-28 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-589:
--

Affects Version/s: 0.8
   Labels: bugs newbie  (was: )

> Clean shutdown after startup connection failure
> ---
>
> Key: KAFKA-589
> URL: https://issues.apache.org/jira/browse/KAFKA-589
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8, 0.7.2
>Reporter: Jason Rosenberg
>Priority: Minor
>  Labels: bugs, newbie
>
> Hi,
> I'm embedding the kafka server (0.7.2) in an application container.   I've 
> noticed that if I try to start the server without zookeeper being available, 
> by default it gets a zk connection timeout after 6 seconds, and then throws 
> an Exception out of KafkaServer.startup()E.g., I see this stack trace:
> Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: 
> Unable to connect to zookeeper server within timeout: 6000
>   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
>   at kafka.log.LogManager.(LogManager.scala:93)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> 
> 
> So that's ok, I can catch the exception, and then shut everything down 
> gracefully, in this case.  However, when I do this, it seems there is a 
> daemon thread still around, which doesn't quit, and so the server never 
> actually exits the jvm.  Specifically, this thread seems to hang around:
> "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
> condition [112c07000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <7f40d4be8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>   at java.lang.Thread.run(Thread.java:680)
> Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
> the scheduler to clean logs, before then trying to connect to zk (and in this 
> case fail):
>   /* Schedule the cleanup task to delete old logs */
>   if(scheduler != null) {
> info("starting log cleaner every " + logCleanupIntervalMs + " ms")
> scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
>   }
> So this scheduler does not appear to be stopped if startup fails.  However, 
> if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
> then it will stop the scheduler, and all is good.
> However, it seems odd that if I get an exception when calling 
> KafkaServer.startup(), that I should still have to do a 
> KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
> cleanup after itself if startup() gets an exception?  I'm not sure I can 
> reliably call shutdown() after a failed startup()

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-589) Clean shutdown after startup connection failure

2012-10-28 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485798#comment-13485798
 ] 

Jun Rao commented on KAFKA-589:
---

This problem exists in 0.8 too. What we need to do is to add a try/catch in 
KafkaServer.start() and call shutdown if we hit any exceptions.

> Clean shutdown after startup connection failure
> ---
>
> Key: KAFKA-589
> URL: https://issues.apache.org/jira/browse/KAFKA-589
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8, 0.7.2
>Reporter: Jason Rosenberg
>Priority: Minor
>  Labels: bugs, newbie
>
> Hi,
> I'm embedding the kafka server (0.7.2) in an application container.   I've 
> noticed that if I try to start the server without zookeeper being available, 
> by default it gets a zk connection timeout after 6 seconds, and then throws 
> an Exception out of KafkaServer.startup()E.g., I see this stack trace:
> Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: 
> Unable to connect to zookeeper server within timeout: 6000
>   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>   at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
>   at kafka.log.LogManager.(LogManager.scala:93)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> 
> 
> So that's ok, I can catch the exception, and then shut everything down 
> gracefully, in this case.  However, when I do this, it seems there is a 
> daemon thread still around, which doesn't quit, and so the server never 
> actually exits the jvm.  Specifically, this thread seems to hang around:
> "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
> condition [112c07000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <7f40d4be8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>   at java.lang.Thread.run(Thread.java:680)
> Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
> the scheduler to clean logs, before then trying to connect to zk (and in this 
> case fail):
>   /* Schedule the cleanup task to delete old logs */
>   if(scheduler != null) {
> info("starting log cleaner every " + logCleanupIntervalMs + " ms")
> scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
>   }
> So this scheduler does not appear to be stopped if startup fails.  However, 
> if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
> then it will stop the scheduler, and all is good.
> However, it seems odd that if I get an exception when calling 
> KafkaServer.startup(), that I should still have to do a 
> KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
> cleanup after itself if startup() gets an exception?  I'm not sure I can 
> reliably call shutdown() after a failed startup()

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: move 0.8 back to trunk

2012-10-28 Thread Jun Rao
I think it's better if we do this before moving to git. I am thinking of
making the change next week. We likely need a new repository when we
graduate. We can probably move to git then.

Thanks,

Jun

On Sat, Oct 27, 2012 at 9:58 PM, Joe Stein  wrote:

> should we do this before or after we move to git ?
>
> On Fri, Oct 26, 2012 at 7:41 PM, Jun Rao  wrote:
>
> > The 0.8 branch has stabilized quite a bit now. We still have to look into
> > some corner cases and performance, but most system tests are now
> passing. I
> > think it's time to bring 0.8 back to trunk and let people start trying
> it.
> > I think we should just move trunk to an 0.7 branch and then move 0.8 to
> > trunk. Any objections?
> >
> > Thanks,
> >
> > Jun
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>


[jira] [Commented] (KAFKA-577) extend DumpLogSegments to verify consistency btw data and index

2012-10-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485306#comment-13485306
 ] 

Jun Rao commented on KAFKA-577:
---

Thanks for the patch. Some comments:

1. Could you add a --verifyOnly option so that we only do the verification but 
not print out the content?
2. The text in the following statement is too verbose. It doesn't need to print 
the index file since it's provided in the command line. It can just say "Index 
position %d doesn't match log position at offset %d".
System.err.println(("The offset in index file [%s] does not match the 
offset stored in " +
"log file [%s], they're %d and %d 
separately").format(entry.offset + index.baseOffset, messageAndOffset.offset))
3. Shouldn't we use %d instead of %l in the following line?
System.err.println("The offset in the data log file [%s] is not 
consecutive, [%l] follows [%l]".format(file.getName, messageAndOffset.offset, 
lastOffset))
4. Could you add a shell script for DumpLogSegments in bin/ ?


> extend DumpLogSegments to verify consistency btw data and index
> ---
>
> Key: KAFKA-577
> URL: https://issues.apache.org/jira/browse/KAFKA-577
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>  Labels: newbie, tools
> Attachments: kafka_577_v1.diff
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be good to extend DumpLogSegments to do the following verification:
> 1. The offsets stored in the index match those in the log data.
> 2. The offsets in the data log is consecutive.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload

2012-10-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485042#comment-13485042
 ] 

Jun Rao commented on KAFKA-267:
---

What we can do is that, in system tests, tune #retries and backoff time in 
ProducerPerformance according to the failure scenario so that we expect no data 
loss on the producer side. Both knobs are exposed in ProducerPerformance now.

> Enhance ProducerPerformance to generate unique random Long value for payload
> 
>
> Key: KAFKA-267
> URL: https://issues.apache.org/jira/browse/KAFKA-267
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Fung
>Assignee: Yang Ye
> Attachments: kafka-267-v1.patch, kafka-267-v2.patch, 
> kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, 
> kafka-267-v6.patch, kafka-267-v7.patch
>
>
> This is achieved by:
> 1. Adding a new class UniqueRandom to shuffle a range of numbers.
> 2. An optional new argument "start-index" is added to specify the starting 
> number of the range to be shuffled. If this argument is omitted, it is 
> defaulted to 1. So it is backward compatible with the argument options.
> 3. The ending number of the range is the starting number + number of messages 
> - 1.
> Other ProducerPerformance advancement: 
> 1. producing to multiple topics
> 2. supporting multiple instances of producer performance ( and distinguishes 
> them)
> 3. allowing waiting some time after sending a request

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-532) Multiple controllers can co-exist during soft failures

2012-10-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485030#comment-13485030
 ] 

Jun Rao commented on KAFKA-532:
---

The purpose of controller epoc is to prevent an older controller from 
overriding the data already updated by a newer controller. What we can do is 
that when a controller wants to update leaderAndIsr, it first checks and makes 
sure that the controller epoc stored in the path is less than or equal to the 
current controller epoc. Otherwise, the controller won't update the path. This 
way, the controller epoc associated with leaderAndIsr is only updated when it's 
truly needed, i.e., when the controller wants to update the leader or the isr. 
So we don't need to rewrite leaderAndIsr during controller failover. When 
sending leaderAndIsr requests, we just need to send the controller epoc stored 
in the leaderAndIsr path.

> Multiple controllers can co-exist during soft failures
> --
>
> Key: KAFKA-532
> URL: https://issues.apache.org/jira/browse/KAFKA-532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-532-v1.patch, kafka-532-v2.patch
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> If the current controller experiences an intermittent soft failure (GC pause) 
> in the middle of leader election or partition reassignment, a new controller 
> might get elected and start communicating new state change decisions to the 
> brokers. After recovering from the soft failure, the old controller might 
> continue sending some stale state change decisions to the brokers, resulting 
> in unexpected failures. We need to introduce a controller generation id that 
> increments with controller election. The brokers should reject any state 
> change requests by a controller with an older generation id.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-267) Enhance ProducerPerformance to generate unique random Long value for payload

2012-10-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485024#comment-13485024
 ] 

Jun Rao commented on KAFKA-267:
---

Thanks for patch v7. Not sure if printing messages in syncProducer is a good 
idea. In general, messages sent in syncProducer are not necessarily strings and 
may not be printable. ProducerPerformance, on the other hand, always sends 
string messages, which are printable. So, I suggest that we keep the message 
printing in ProducerPerformance. 

> Enhance ProducerPerformance to generate unique random Long value for payload
> 
>
> Key: KAFKA-267
> URL: https://issues.apache.org/jira/browse/KAFKA-267
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Fung
>Assignee: Yang Ye
> Attachments: kafka-267-v1.patch, kafka-267-v2.patch, 
> kafka-267-v3.patch, kafka-267-v4.patch, kafka-267-v5.patch, 
> kafka-267-v6.patch, kafka-267-v7.patch
>
>
> This is achieved by:
> 1. Adding a new class UniqueRandom to shuffle a range of numbers.
> 2. An optional new argument "start-index" is added to specify the starting 
> number of the range to be shuffled. If this argument is omitted, it is 
> defaulted to 1. So it is backward compatible with the argument options.
> 3. The ending number of the range is the starting number + number of messages 
> - 1.
> Other ProducerPerformance advancement: 
> 1. producing to multiple topics
> 2. supporting multiple instances of producer performance ( and distinguishes 
> them)
> 3. allowing waiting some time after sending a request

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly

2012-10-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485020#comment-13485020
 ] 

Jun Rao commented on KAFKA-588:
---

>From the log, the broker wants to truncate log to offset 429050. The last 
>entry in the log is the following and looks correct.
offset: 429049 position: 155842578 isvalid: true payloadsize: 500 magic: 2 
compresscodec: NoCompressionCodec crc: 3220325748

However, the last index entry is the following, which is off by 1.
offset: 429050 position: 155843100


> Index truncation doesn't seem to remove the last entry properly
> ---
>
> Key: KAFKA-588
> URL: https://issues.apache.org/jira/browse/KAFKA-588
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Priority: Blocker
>  Labels: bugs
>
> [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment 
> /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset 
> 429050 (kafka.log.
> Log)
> [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding 
> fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with 
> fetcherId 0 (kafka.server.R
> eplicaFetcherManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader 
> and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> 
> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), 
> (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" 
> },3))) (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 1 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 0 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error 
> due to  (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempt to append an offset (429050) no 
> larger than the last offset appended (429050).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:180)
> at kafka.log.LogSegment.append(LogSegment.scala:56)
> at kafka.log.Log.append(Log.scala:273)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly

2012-10-26 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-588:
--

Labels: bugs  (was: )

> Index truncation doesn't seem to remove the last entry properly
> ---
>
> Key: KAFKA-588
> URL: https://issues.apache.org/jira/browse/KAFKA-588
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Priority: Blocker
>  Labels: bugs
>
> [2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment 
> /tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset 
> 429050 (kafka.log.
> Log)
> [2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding 
> fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with 
> fetcherId 0 (kafka.server.R
> eplicaFetcherManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader 
> and isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> 
> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3), 
> (test_1,0) -> PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" 
> },3))) (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 1 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the 
> follower state transition to follow leader 2 for topic test_1 partition 0 
> (kafka.server.ReplicaManager)
> [2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current 
> leader epoch [2] is larger or equal to the requested leader epoch [2], 
> discard the become follower request (kafka.cluster.Partition)
> [2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error 
> due to  (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempt to append an offset (429050) no 
> larger than the last offset appended (429050).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:180)
> at kafka.log.LogSegment.append(LogSegment.scala:56)
> at kafka.log.Log.append(Log.scala:273)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-588) Index truncation doesn't seem to remove the last entry properly

2012-10-26 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-588:
-

 Summary: Index truncation doesn't seem to remove the last entry 
properly
 Key: KAFKA-588
 URL: https://issues.apache.org/jira/browse/KAFKA-588
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Priority: Blocker


[2012-10-26 08:04:13,333] INFO [Kafka Log on Broker 3], Truncated log segment 
/tmp/kafka_server_3_logs/test_1-0/00130500.log to target offset 
429050 (kafka.log.
Log)
[2012-10-26 08:04:13,333] INFO [ReplicaFetcherManager on broker 3] adding 
fetcher on topic test_1, partion 0, initOffset 429050 to broker 2 with 
fetcherId 0 (kafka.server.R
eplicaFetcherManager)
[2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Handling leader and 
isr request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> PartitionStateInfo({ 
"ISR":"2,3","leader":"2","leaderEpoch":"2" },3), (test_1,0) -> 
PartitionStateInfo({ "ISR":"2,3","leader":"2","leaderEpoch":"2" },3))) 
(kafka.server.ReplicaManager)
[2012-10-26 08:04:13,335] INFO Replica Manager on Broker 3: Starting the 
follower state transition to follow leader 2 for topic test_1 partition 1 
(kafka.server.ReplicaManager)
[2012-10-26 08:04:13,335] INFO Partition [test_1, 1] on broker 3: Current 
leader epoch [2] is larger or equal to the requested leader epoch [2], discard 
the become follower request (kafka.cluster.Partition)
[2012-10-26 08:04:13,336] INFO Replica Manager on Broker 3: Starting the 
follower state transition to follow leader 2 for topic test_1 partition 0 
(kafka.server.ReplicaManager)
[2012-10-26 08:04:13,336] INFO Partition [test_1, 0] on broker 3: Current 
leader epoch [2] is larger or equal to the requested leader epoch [2], discard 
the become follower request (kafka.cluster.Partition)
[2012-10-26 08:04:13,588] ERROR [ReplicaFetcherThread-2-0-on-broker-3], Error 
due to  (kafka.server.ReplicaFetcherThread)
java.lang.IllegalArgumentException: Attempt to append an offset (429050) no 
larger than the last offset appended (429050).
at kafka.log.OffsetIndex.append(OffsetIndex.scala:180)
at kafka.log.LogSegment.append(LogSegment.scala:56)
at kafka.log.Log.append(Log.scala:273)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1

2012-10-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-584:
--

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the review. Changed the logging to trace level and committed to 0.8.

> produce/fetch remote time metric not set correctly when num.acks = 1
> 
>
> Key: KAFKA-584
> URL: https://issues.apache.org/jira/browse/KAFKA-584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Jun Rao
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-584.patch
>
>
> When num.acks = 1, the produce/fetch remote time is set to a very high value 
> (several hours). This is due to a race condition on the apiLocalTime, which 
> is initialized to -1, that makes the (responseTime - apiLocalTime) a very 
> large value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1

2012-10-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-584.
-


> produce/fetch remote time metric not set correctly when num.acks = 1
> 
>
> Key: KAFKA-584
> URL: https://issues.apache.org/jira/browse/KAFKA-584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Jun Rao
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-584.patch
>
>
> When num.acks = 1, the produce/fetch remote time is set to a very high value 
> (several hours). This is due to a race condition on the apiLocalTime, which 
> is initialized to -1, that makes the (responseTime - apiLocalTime) a very 
> large value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1

2012-10-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484251#comment-13484251
 ] 

Jun Rao commented on KAFKA-584:
---

Also, volatile makes a long value atomic according to the following article, 
which is what we want.
http://gee.cs.oswego.edu/dl/cpj/jmm.html

> produce/fetch remote time metric not set correctly when num.acks = 1
> 
>
> Key: KAFKA-584
> URL: https://issues.apache.org/jira/browse/KAFKA-584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-584.patch
>
>
> When num.acks = 1, the produce/fetch remote time is set to a very high value 
> (several hours). This is due to a race condition on the apiLocalTime, which 
> is initialized to -1, that makes the (responseTime - apiLocalTime) a very 
> large value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1

2012-10-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-584:
--

Fix Version/s: 0.8
 Assignee: Jun Rao
   Status: Patch Available  (was: Open)

> produce/fetch remote time metric not set correctly when num.acks = 1
> 
>
> Key: KAFKA-584
> URL: https://issues.apache.org/jira/browse/KAFKA-584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Jun Rao
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-584.patch
>
>
> When num.acks = 1, the produce/fetch remote time is set to a very high value 
> (several hours). This is due to a race condition on the apiLocalTime, which 
> is initialized to -1, that makes the (responseTime - apiLocalTime) a very 
> large value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-584) produce/fetch remote time metric not set correctly when num.acks = 1

2012-10-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-584:
--

Attachment: kafka-584.patch

Attach a patch. The problem is that with ack = 1, response could be sent before 
apilocaltime is updated. Fixed it by setting apilocatime to responsetime, if 
not set. Also, since those times are updated in different threads, make them 
volatile so that updates are exposed to other threads. Now, remote times are 0 
with ack = 1.

> produce/fetch remote time metric not set correctly when num.acks = 1
> 
>
> Key: KAFKA-584
> URL: https://issues.apache.org/jira/browse/KAFKA-584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>  Labels: bugs
> Attachments: kafka-584.patch
>
>
> When num.acks = 1, the produce/fetch remote time is set to a very high value 
> (several hours). This is due to a race condition on the apiLocalTime, which 
> is initialized to -1, that makes the (responseTime - apiLocalTime) a very 
> large value.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-581) provides windows batch script for starting Kafka/Zookeeper

2012-10-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484233#comment-13484233
 ] 

Jun Rao commented on KAFKA-581:
---

Since we have quite a few scripts in bin/. I think it's actually better to have 
the windows version in a separate directory. Perhaps we can update our 
quickstart with the windows script dir when 0.8 is released.

> provides windows batch script for starting Kafka/Zookeeper
> --
>
> Key: KAFKA-581
> URL: https://issues.apache.org/jira/browse/KAFKA-581
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
> Environment: Windows
>Reporter: antoine vianey
>Priority: Trivial
>  Labels: features, run, windows
> Fix For: 0.8
>
> Attachments: kafka-console-consumer.bat, kafka-console-producer.bat, 
> kafka-run-class.bat, kafka-server-start.bat, sbt.bat, 
> zookeeper-server-start.bat
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Provide a port for quickstarting Kafka dev on Windows :
> - kafka-run-class.bat
> - kafka-server-start.bat
> - zookeeper-server-start.bat
> This will help Kafka community growth 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

2012-10-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483894#comment-13483894
 ] 

Jun Rao commented on KAFKA-340:
---

21.4 I looked again. Yes, your code is correct.

One more minor comment.
22. ControllerBrokerRequestBatch.sendRequestsToBrokers(): In m.foreach, instead 
of using r, could we use case(brokerId, partitionsToBeStopped)? Then we can 
refer to those names directly, instead of r._1 and r._2.

Other than that, the patch looks good.

> Implement clean shutdown in 0.8
> ---
>
> Key: KAFKA-340
> URL: https://issues.apache.org/jira/browse/KAFKA-340
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, 
> KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only 
> that broker, we could lose some messages that have been previously committed. 
> For clean shutdown, we need to guarantee that there is at least 1 other 
> broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

2012-10-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483657#comment-13483657
 ] 

Jun Rao commented on KAFKA-340:
---

21.2 My confusion is that I thought replicatedPartitionsBrokerLeads is a val 
instead of a method. Could you change replicatedPartitionsBrokerLeads.toSet to 
replicatedPartitionsBrokerLeads().toSet to indicate that there is side effect? 
Also, could you add a comment to shutdownBroker() to describe what it does and 
what the return value is? 

21.3 Looked again. Yes, you are right. We don't need to lock there.

21.4 The code still doesn't send stopReplicaRequests for partitions whose 
leader is on the broker to be shut down. This is not truly needed since the 
leader won't issue any fetch request. However, it would be better to stop those 
replicas too.

> Implement clean shutdown in 0.8
> ---
>
> Key: KAFKA-340
> URL: https://issues.apache.org/jira/browse/KAFKA-340
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>    Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
>  Labels: bugs
> Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, 
> KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only 
> that broker, we could lose some messages that have been previously committed. 
> For clean shutdown, we need to guarantee that there is at least 1 other 
> broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-571) Add more test cases to System Test

2012-10-24 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-571.
-


> Add more test cases to System Test
> --
>
> Key: KAFKA-571
> URL: https://issues.apache.org/jira/browse/KAFKA-571
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Fung
>Assignee: John Fung
> Fix For: 0.8
>
> Attachments: kafka-0.7.0.jar, kafka-571-SimpleConsumerShell.patch, 
> kafka-571-v1.patch, kafka-571-v2.patch, kafka-571-v3.patch, 
> kafka-perf-0.7.0.jar, zkclient-0.1.jar
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


  1   2   3   4   5   6   7   8   9   10   >