[jira] [Commented] (KAFKA-706) broker appears to be encoding ProduceResponse, but never sending it

2013-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574275#comment-13574275
 ] 

Neha Narkhede commented on KAFKA-706:
-

Great catch, Sriram ! I think the v2 patch on KAFKA-736 might solve this 
problem.

> broker appears to be encoding ProduceResponse, but never sending it
> ---
>
> Key: KAFKA-706
> URL: https://issues.apache.org/jira/browse/KAFKA-706
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: reproduced on both Mac OS and RH linux, via private 
> node.js client
>Reporter: ben fleis
>Assignee: Sriram Subramanian
>
> By all appearances, I seem to be able to convince a broker to periodically 
> encode, but never transmit, a ProduceResponse.  Unfortunately my client is 
> proprietary, but I will share it with Neha via LI channels.  But I will 
> describe what's going on in the hopes that there's another trivial way to 
> reproduce it.  (I did search through JIRA, and haven't found anything that 
> looks like this.)
> I am running a single instance zookeeper and single broker.  I have a client 
> that generates configurable amounts of data, tracking what is produced (both 
> sent and ACK'd), and what is consumed.  I was noticing that when using high 
> transfer rates via high frequency single messages, my unack'd queue appeared 
> to be getting continuously larger.  So, I outfitted my client to log more 
> information about correlation ids at various stages, and modified the kafka 
> ProducerRequest/ProducerResponse to log (de)serialization of the same.  I 
> then used tcpdump to intercept all communications between my client and the 
> broker.  Finally, I configured my client to generate 1 message per ~10ms, 
> each payload being approximately 33 bytes; requestAckTimeout was set to 
> 2000ms, and requestAcksRequired was set to 1.  I used 10ms as I found that 
> 5ms or less caused my unacked queue to build up due to system speed -- it 
> simply couldn't keep up.  10ms keeps the load high, but just manageable.  
> YMMV with that param.  All of this is done on a single host, over loopback.  
> I ran it on both my airbook, and a well setup RH linux box, and found the 
> same problem.
> At startup, my system logged "expired" requests - meaning reqs that were 
> sent, but for which no ACK, positive or negative, was seen from the broker, 
> within 1.25x the requestAckTimeout (ie, 2500ms).  I would let it settle until 
> the unacked queue was stable at or around 0.
> What I found is this: ACKs are normally generated within milliseconds.  This 
> was demonstrated by my logging added to the scala ProducerRe* classes, and 
> they are normally seen quickly by my client.  But when the actual error 
> occurs, namely that a request is ignored, the ProducerResponse class *does* 
> encode the correct correlationId; however, a response containing that ID is 
> never sent over the network, as evidenced by my tcpdump traces.  In my 
> experience this would take anywhere from 3-15 seconds to occur after the 
> system was warm, meaning that it's 1 out of several hundred on average that 
> shows the condition.
> While I can't attach my client code, I could attach logs; but since my 
> intention is to share the code with LI people, I will wait to see if that's 
> useful here.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-734) Migration tool needs a revamp, it was poorly written and has many performance bugs

2013-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574273#comment-13574273
 ] 

Neha Narkhede commented on KAFKA-734:
-

Few problems that are not solved yet -

1. If consumer threads shutdown on their own, there is no way to shutdown the 
entire tool
2. If producer threads shutdown, there is no way to shutdown the entire tool

> Migration tool needs a revamp, it was poorly written and has many performance 
> bugs
> --
>
> Key: KAFKA-734
> URL: https://issues.apache.org/jira/browse/KAFKA-734
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: p1
> Attachments: kafka-734-v1.patch, kafka-734-v2.patch, 
> kafka-734-v3.patch, kafka-734-v4.patch
>
>
> Migration tool has a number of problems ranging from poor logging to poor 
> design. This needs to be thought through again

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-734) Migration tool needs a revamp, it was poorly written and has many performance bugs

2013-02-07 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-734:


Attachment: kafka-734-v4.patch

More improvements to migration tool -

1. Added a shutdown hook and shutdown logic

2. Changed the design of migration tool as per Jun's suggestion. Basically, it 
looks more like the request channel idea from the socket server. The migration 
threads are consumers that add to a common producer channel. The producer 
threads pull from the common channel and send data across. This ensures that if 
one of the producers slow down, the data keeps flowing through rest of the 
producers.

3. Didn't get a chance to test this on a large workload, there might be bugs.

> Migration tool needs a revamp, it was poorly written and has many performance 
> bugs
> --
>
> Key: KAFKA-734
> URL: https://issues.apache.org/jira/browse/KAFKA-734
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: p1
> Attachments: kafka-734-v1.patch, kafka-734-v2.patch, 
> kafka-734-v3.patch, kafka-734-v4.patch
>
>
> Migration tool has a number of problems ranging from poor logging to poor 
> design. This needs to be thought through again

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka replication blog

2013-02-07 Thread Jun Rao
We have fixed this issue in 0.8. Withreplication factor 1, if the producer
doesn't care about partitioning by key, messages will be sent to partitions
that are currently available.

Thanks,

Jun

On Thu, Feb 7, 2013 at 3:11 PM, Michal Haris wrote:

> Same here, summary was need as we have a fairly large ecosystem of multiple
> 0.7.2 clusters and I am planning to test upgrade to 0.8.
> However, one thing  creeping at the back of my mind regarding 0.8 is
> something i have spotted in one thread few weeks ago namely that the
> rebalance behaviour of producers is not as robust as in 0.7.x without
> replication and i remeber there was no designed solution at the time - any
> news here ? Basically our usecase doesn't require replication but logical
> offsets and some other things introduced would solve some problems.
> On Feb 7, 2013 7:11 PM, "Vaibhav Puranik"  wrote:
>
> > Same here. Thanks a lot Jun.
> >
> > Regards,
> > Vaibhav
> >
> > On Thu, Feb 7, 2013 at 10:38 AM, Felix GV  wrote:
> >
> > > Thanks Jun!
> > >
> > > I hadn't been following the discussions regarding 0.8 and replication
> > for a
> > > little while and this was a great post to refresh my memory and get up
> to
> > > speed on the current replication architecture's design.
> > >
> > > --
> > > Felix
> > >
> > >
> > > On Tue, Feb 5, 2013 at 2:21 PM, Jun Rao  wrote:
> > >
> > > > I just posted the following blog on Kafka replication. This may
> answer
> > > some
> > > > of the questions that a few people have asked in the mailing list
> > before.
> > > >
> > > >
> > > >
> > >
> >
> http://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > >
> >
>


Re: kafka replication blog

2013-02-07 Thread Vaibhav Puranik
Same here. Thanks a lot Jun.

Regards,
Vaibhav

On Thu, Feb 7, 2013 at 10:38 AM, Felix GV  wrote:

> Thanks Jun!
>
> I hadn't been following the discussions regarding 0.8 and replication for a
> little while and this was a great post to refresh my memory and get up to
> speed on the current replication architecture's design.
>
> --
> Felix
>
>
> On Tue, Feb 5, 2013 at 2:21 PM, Jun Rao  wrote:
>
> > I just posted the following blog on Kafka replication. This may answer
> some
> > of the questions that a few people have asked in the mailing list before.
> >
> >
> >
> http://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
> >
> > Thanks,
> >
> > Jun
> >
>


Re: kafka replication blog

2013-02-07 Thread Felix GV
Thanks Jun!

I hadn't been following the discussions regarding 0.8 and replication for a
little while and this was a great post to refresh my memory and get up to
speed on the current replication architecture's design.

--
Felix


On Tue, Feb 5, 2013 at 2:21 PM, Jun Rao  wrote:

> I just posted the following blog on Kafka replication. This may answer some
> of the questions that a few people have asked in the mailing list before.
>
>
> http://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
>
> Thanks,
>
> Jun
>


[jira] [Updated] (KAFKA-748) Append to index fails due to invalid offset

2013-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-748:


Attachment: KAFKA-748-v1.patch

Attached is a patch that retains the index position on resize. It also includes 
a few useful logging and assert statements.

> Append to index fails due to invalid offset
> ---
>
> Key: KAFKA-748
> URL: https://issues.apache.org/jira/browse/KAFKA-748
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Sriram Subramanian
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: p1
> Fix For: 0.8
>
> Attachments: KAFKA-748-v1.patch, outindex, outmsg
>
>
> We seem to be appending to the index and it checks to make sure we do not 
> insert an entry with an offset that is larger than the actual offset of the 
> message. We seem to be trying to insert an offset = 1 in the index while 
> lastOffset is 24463. This seems to get fixed on restarting the broker.
> java.lang.IllegalArgumentException: Attempt to append an offset (1) to 
> position 21703 no larger than the last offset appended (24463).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:183)
> at kafka.log.LogSegment.append(LogSegment.scala:60)
> at kafka.log.Log.append(Log.scala:286)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:188)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:181)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-750) inconsistent index offset during broker startup

2013-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574094#comment-13574094
 ] 

Jay Kreps commented on KAFKA-750:
-

Item number 1 could actually be caused by KAFKA-748, I think.

> inconsistent index offset during broker startup
> ---
>
> Key: KAFKA-750
> URL: https://issues.apache.org/jira/browse/KAFKA-750
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs, p1
>
> Saw the following log during a clean restart of a broker.
> 2013/01/29 19:18:12.073 INFO [FileMessageSet] [main] [kafka] []  Creating or 
> reloading log segment 
> /export/content/kafka/i001_caches/topic1-3/.log2013/01/29 
> 19:18:12.074 INFO [OffsetIndex] [main] [kafka] []  Created index file 
> /export/content/kafka/i001_caches/topic1-3/.index with 
> maxEntries = 65
> 5360, maxIndexSize = 10485760, entries = 655360, lastOffset = 0
> A couple of things are weird.
> 1. There are entries in the index, but lastOffset is 0.
> 2 maxIndexSize/manxEntries = 16, instead of 8.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-706) broker appears to be encoding ProduceResponse, but never sending it

2013-02-07 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574089#comment-13574089
 ] 

Sriram Subramanian commented on KAFKA-706:
--

I think I know what is happening here.

Our current server is not suitable for async io yet from the client. As part of 
the processor thread, we continuously invoke processNewResponse on each 
iteration. processNewResponse does the following

1. dequeue the response from the response queue
2. set the interest bit of the selector key to write 
3. attach the response to the key

The problem is that we dont check if the previous response attached to the key 
has already been sent or not. We just replace the response and hence drop 
arbitrary responses. This should not happen with the v2 patch for KAFKA-736 
since we would serialize the requests from a client.

> broker appears to be encoding ProduceResponse, but never sending it
> ---
>
> Key: KAFKA-706
> URL: https://issues.apache.org/jira/browse/KAFKA-706
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: reproduced on both Mac OS and RH linux, via private 
> node.js client
>Reporter: ben fleis
>Assignee: Sriram Subramanian
>
> By all appearances, I seem to be able to convince a broker to periodically 
> encode, but never transmit, a ProduceResponse.  Unfortunately my client is 
> proprietary, but I will share it with Neha via LI channels.  But I will 
> describe what's going on in the hopes that there's another trivial way to 
> reproduce it.  (I did search through JIRA, and haven't found anything that 
> looks like this.)
> I am running a single instance zookeeper and single broker.  I have a client 
> that generates configurable amounts of data, tracking what is produced (both 
> sent and ACK'd), and what is consumed.  I was noticing that when using high 
> transfer rates via high frequency single messages, my unack'd queue appeared 
> to be getting continuously larger.  So, I outfitted my client to log more 
> information about correlation ids at various stages, and modified the kafka 
> ProducerRequest/ProducerResponse to log (de)serialization of the same.  I 
> then used tcpdump to intercept all communications between my client and the 
> broker.  Finally, I configured my client to generate 1 message per ~10ms, 
> each payload being approximately 33 bytes; requestAckTimeout was set to 
> 2000ms, and requestAcksRequired was set to 1.  I used 10ms as I found that 
> 5ms or less caused my unacked queue to build up due to system speed -- it 
> simply couldn't keep up.  10ms keeps the load high, but just manageable.  
> YMMV with that param.  All of this is done on a single host, over loopback.  
> I ran it on both my airbook, and a well setup RH linux box, and found the 
> same problem.
> At startup, my system logged "expired" requests - meaning reqs that were 
> sent, but for which no ACK, positive or negative, was seen from the broker, 
> within 1.25x the requestAckTimeout (ie, 2500ms).  I would let it settle until 
> the unacked queue was stable at or around 0.
> What I found is this: ACKs are normally generated within milliseconds.  This 
> was demonstrated by my logging added to the scala ProducerRe* classes, and 
> they are normally seen quickly by my client.  But when the actual error 
> occurs, namely that a request is ignored, the ProducerResponse class *does* 
> encode the correct correlationId; however, a response containing that ID is 
> never sent over the network, as evidenced by my tcpdump traces.  In my 
> experience this would take anywhere from 3-15 seconds to occur after the 
> system was warm, meaning that it's 1 out of several hundred on average that 
> shows the condition.
> While I can't attach my client code, I could attach logs; but since my 
> intention is to share the code with LI people, I will wait to see if that's 
> useful here.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-706) broker appears to be encoding ProduceResponse, but never sending it

2013-02-07 Thread Sriram Subramanian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriram Subramanian reassigned KAFKA-706:


Assignee: Sriram Subramanian

> broker appears to be encoding ProduceResponse, but never sending it
> ---
>
> Key: KAFKA-706
> URL: https://issues.apache.org/jira/browse/KAFKA-706
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: reproduced on both Mac OS and RH linux, via private 
> node.js client
>Reporter: ben fleis
>Assignee: Sriram Subramanian
>
> By all appearances, I seem to be able to convince a broker to periodically 
> encode, but never transmit, a ProduceResponse.  Unfortunately my client is 
> proprietary, but I will share it with Neha via LI channels.  But I will 
> describe what's going on in the hopes that there's another trivial way to 
> reproduce it.  (I did search through JIRA, and haven't found anything that 
> looks like this.)
> I am running a single instance zookeeper and single broker.  I have a client 
> that generates configurable amounts of data, tracking what is produced (both 
> sent and ACK'd), and what is consumed.  I was noticing that when using high 
> transfer rates via high frequency single messages, my unack'd queue appeared 
> to be getting continuously larger.  So, I outfitted my client to log more 
> information about correlation ids at various stages, and modified the kafka 
> ProducerRequest/ProducerResponse to log (de)serialization of the same.  I 
> then used tcpdump to intercept all communications between my client and the 
> broker.  Finally, I configured my client to generate 1 message per ~10ms, 
> each payload being approximately 33 bytes; requestAckTimeout was set to 
> 2000ms, and requestAcksRequired was set to 1.  I used 10ms as I found that 
> 5ms or less caused my unacked queue to build up due to system speed -- it 
> simply couldn't keep up.  10ms keeps the load high, but just manageable.  
> YMMV with that param.  All of this is done on a single host, over loopback.  
> I ran it on both my airbook, and a well setup RH linux box, and found the 
> same problem.
> At startup, my system logged "expired" requests - meaning reqs that were 
> sent, but for which no ACK, positive or negative, was seen from the broker, 
> within 1.25x the requestAckTimeout (ie, 2500ms).  I would let it settle until 
> the unacked queue was stable at or around 0.
> What I found is this: ACKs are normally generated within milliseconds.  This 
> was demonstrated by my logging added to the scala ProducerRe* classes, and 
> they are normally seen quickly by my client.  But when the actual error 
> occurs, namely that a request is ignored, the ProducerResponse class *does* 
> encode the correct correlationId; however, a response containing that ID is 
> never sent over the network, as evidenced by my tcpdump traces.  In my 
> experience this would take anywhere from 3-15 seconds to occur after the 
> system was warm, meaning that it's 1 out of several hundred on average that 
> shows the condition.
> While I can't attach my client code, I could attach logs; but since my 
> intention is to share the code with LI people, I will wait to see if that's 
> useful here.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-748) Append to index fails due to invalid offset

2013-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574067#comment-13574067
 ] 

Jay Kreps commented on KAFKA-748:
-

Okay the problem here is the resize method. We generalized index.trimInvalid to 
index.resize to be able to enlarge the index when we load a segment. This was 
done to avoid rolling the log on a full index, I think, which had other 
problems with empty indexes. However it looks like this never actually was 
tried, because doing this resets the position in the mmap to 0, so we start 
overwriting the entries in the index from the beginning but expanding what we 
think the valid segment of the memory map is. When we close we then have a 
bunch of zeros at the end of the index and have overwritten the front of the 
file.

> Append to index fails due to invalid offset
> ---
>
> Key: KAFKA-748
> URL: https://issues.apache.org/jira/browse/KAFKA-748
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Sriram Subramanian
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: p1
> Fix For: 0.8
>
> Attachments: outindex, outmsg
>
>
> We seem to be appending to the index and it checks to make sure we do not 
> insert an entry with an offset that is larger than the actual offset of the 
> message. We seem to be trying to insert an offset = 1 in the index while 
> lastOffset is 24463. This seems to get fixed on restarting the broker.
> java.lang.IllegalArgumentException: Attempt to append an offset (1) to 
> position 21703 no larger than the last offset appended (24463).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:183)
> at kafka.log.LogSegment.append(LogSegment.scala:60)
> at kafka.log.Log.append(Log.scala:286)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:188)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:181)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka replication blog

2013-02-07 Thread Michal Haris
Same here, summary was need as we have a fairly large ecosystem of multiple
0.7.2 clusters and I am planning to test upgrade to 0.8.
However, one thing  creeping at the back of my mind regarding 0.8 is
something i have spotted in one thread few weeks ago namely that the
rebalance behaviour of producers is not as robust as in 0.7.x without
replication and i remeber there was no designed solution at the time - any
news here ? Basically our usecase doesn't require replication but logical
offsets and some other things introduced would solve some problems.
On Feb 7, 2013 7:11 PM, "Vaibhav Puranik"  wrote:

> Same here. Thanks a lot Jun.
>
> Regards,
> Vaibhav
>
> On Thu, Feb 7, 2013 at 10:38 AM, Felix GV  wrote:
>
> > Thanks Jun!
> >
> > I hadn't been following the discussions regarding 0.8 and replication
> for a
> > little while and this was a great post to refresh my memory and get up to
> > speed on the current replication architecture's design.
> >
> > --
> > Felix
> >
> >
> > On Tue, Feb 5, 2013 at 2:21 PM, Jun Rao  wrote:
> >
> > > I just posted the following blog on Kafka replication. This may answer
> > some
> > > of the questions that a few people have asked in the mailing list
> before.
> > >
> > >
> > >
> >
> http://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
>


[jira] [Commented] (KAFKA-748) Append to index fails due to invalid offset

2013-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13574021#comment-13574021
 ] 

Jay Kreps commented on KAFKA-748:
-

This is actually very easy to reproduce. Just do a clean shutdown on the broker 
while running the producer perf test. Haven't debugged it yet.

> Append to index fails due to invalid offset
> ---
>
> Key: KAFKA-748
> URL: https://issues.apache.org/jira/browse/KAFKA-748
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Sriram Subramanian
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: p1
> Fix For: 0.8
>
> Attachments: outindex, outmsg
>
>
> We seem to be appending to the index and it checks to make sure we do not 
> insert an entry with an offset that is larger than the actual offset of the 
> message. We seem to be trying to insert an offset = 1 in the index while 
> lastOffset is 24463. This seems to get fixed on restarting the broker.
> java.lang.IllegalArgumentException: Attempt to append an offset (1) to 
> position 21703 no larger than the last offset appended (24463).
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:183)
> at kafka.log.LogSegment.append(LogSegment.scala:60)
> at kafka.log.Log.append(Log.scala:286)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:188)
> at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:181)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-750) inconsistent index offset during broker startup

2013-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13573996#comment-13573996
 ] 

Jay Kreps commented on KAFKA-750:
-

Are you sure this is a clean shutdown? Do we have the log from the broker? 
Without that it is pretty hard to figure out.

For issue (1), the lastOffset == 0:

Seeing last offset = 0 should happen in the even of an unclean shutdown. It is 
not possible to append an actual 0 to the index, we guard against that. If the 
segment was from an unclean shutdown this is expected and gets repaired when we 
run recovery on the segment.

I checked the logic for clean shutdown. The only time we create a clean 
shutdown file is logmanager.shutdown:
  def shutdown() {
debug("Shutting down.")
try {
  // close the logs
  allLogs.foreach(_.close())
  // mark that the shutdown was clean by creating the clean shutdown marker 
file
  logDirs.foreach(dir => Utils.swallow(new File(dir, 
CleanShutdownFile).createNewFile()))
} finally {
  // regardless of whether the close succeeded, we need to unlock the data 
directories
  dirLocks.foreach(_.destroy())
}
debug("Shutdown complete.")
  }
I don't think this can fail to call close on the index (which truncates the 
file) and still write an index file. The close() call is indeed truncating the 
index.

There is an issue here which is that our resize() call does not call flush 
after truncating the file. This means that a hard OS crash after a clean 
shutdown could lead to a corrupt index on disk (the truncated file bits could 
re-appear) but also a clean shutdown file. This is a fairly unlikely problem, 
though, as it requires a hard OS crash to coincide with a clean shutdown. I 
don't think that happened here.

That leaves the possibility that the size is somehow getting out of whack with 
the position in the index. This can be modified in truncateTo or append, and 
both seem to correctly manage the size and position.

The second issue of maxEntries vs maxIndexSize is even more curious. The 
maxIndexSize is a configuration parameter so it takes whatever is configured at 
startup time. The maxEntries is set to file_size/8. So if the file was newly 
allocated this would be impossible because it would definitely be set exactly. 
In the event of a clean shutdown any value is possible. The weird thing here is 
that this value happens to be exactly half the maxEntries value. This would be 
a remarkable coincidence. I cannot explain this.




> inconsistent index offset during broker startup
> ---
>
> Key: KAFKA-750
> URL: https://issues.apache.org/jira/browse/KAFKA-750
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs, p1
>
> Saw the following log during a clean restart of a broker.
> 2013/01/29 19:18:12.073 INFO [FileMessageSet] [main] [kafka] []  Creating or 
> reloading log segment 
> /export/content/kafka/i001_caches/topic1-3/.log2013/01/29 
> 19:18:12.074 INFO [OffsetIndex] [main] [kafka] []  Created index file 
> /export/content/kafka/i001_caches/topic1-3/.index with 
> maxEntries = 65
> 5360, maxIndexSize = 10485760, entries = 655360, lastOffset = 0
> A couple of things are weird.
> 1. There are entries in the index, but lastOffset is 0.
> 2 maxIndexSize/manxEntries = 16, instead of 8.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] Subscription: outstanding kafka patches

2013-02-07 Thread jira
Issue Subscription
Filter: outstanding kafka patches (60 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-751   Fix windows build script - kafka-run-class.bat
https://issues.apache.org/jira/browse/KAFKA-751
KAFKA-748   Append to index fails due to invalid offset
https://issues.apache.org/jira/browse/KAFKA-748
KAFKA-745   Remove getShutdownReceive() and other kafka specific code from the 
RequestChannel
https://issues.apache.org/jira/browse/KAFKA-745
KAFKA-743   PreferredReplicaLeaderElectionCommand has command line error
https://issues.apache.org/jira/browse/KAFKA-743
KAFKA-735   Add looping and JSON output for ConsumerOffsetChecker
https://issues.apache.org/jira/browse/KAFKA-735
KAFKA-734   Migration tool needs a revamp, it was poorly written and has many 
performance bugs
https://issues.apache.org/jira/browse/KAFKA-734
KAFKA-733   Fat jar option for build, or override for ivy cache location 
https://issues.apache.org/jira/browse/KAFKA-733
KAFKA-705   Controlled shutdown doesn't seem to work on more than one broker in 
a cluster
https://issues.apache.org/jira/browse/KAFKA-705
KAFKA-682   java.lang.OutOfMemoryError: Java heap space
https://issues.apache.org/jira/browse/KAFKA-682
KAFKA-677   Retention process gives exception if an empty segment is chosen for 
collection
https://issues.apache.org/jira/browse/KAFKA-677
KAFKA-674   Clean Shutdown Testing - Log segments checksums mismatch
https://issues.apache.org/jira/browse/KAFKA-674
KAFKA-671   DelayedProduce requests should not hold full producer request data
https://issues.apache.org/jira/browse/KAFKA-671
KAFKA-652   Create testcases for clean shut-down
https://issues.apache.org/jira/browse/KAFKA-652
KAFKA-645   Create a shell script to run System Test with DEBUG details and 
"tee" console output to a file
https://issues.apache.org/jira/browse/KAFKA-645
KAFKA-637   Separate log4j environment variable from KAFKA_OPTS in 
kafka-run-class.sh
https://issues.apache.org/jira/browse/KAFKA-637
KAFKA-621   System Test 9051 : ConsoleConsumer doesn't receives any data for 20 
topics but works for 10
https://issues.apache.org/jira/browse/KAFKA-621
KAFKA-607   System Test Transient Failure (case 4011 Log Retention) - 
ConsoleConsumer receives less data
https://issues.apache.org/jira/browse/KAFKA-607
KAFKA-606   System Test Transient Failure (case 0302 GC Pause) - Log segments 
mismatched across replicas
https://issues.apache.org/jira/browse/KAFKA-606
KAFKA-604   Add missing metrics in 0.8
https://issues.apache.org/jira/browse/KAFKA-604
KAFKA-598   decouple fetch size from max message size
https://issues.apache.org/jira/browse/KAFKA-598
KAFKA-583   SimpleConsumerShell may receive less data inconsistently
https://issues.apache.org/jira/browse/KAFKA-583
KAFKA-552   No error messages logged for those failing-to-send messages from 
Producer
https://issues.apache.org/jira/browse/KAFKA-552
KAFKA-547   The ConsumerStats MBean name should include the groupid
https://issues.apache.org/jira/browse/KAFKA-547
KAFKA-530   kafka.server.KafkaApis: kafka.common.OffsetOutOfRangeException
https://issues.apache.org/jira/browse/KAFKA-530
KAFKA-493   High CPU usage on inactive server
https://issues.apache.org/jira/browse/KAFKA-493
KAFKA-479   ZK EPoll taking 100% CPU usage with Kafka Client
https://issues.apache.org/jira/browse/KAFKA-479
KAFKA-465   Performance test scripts - refactoring leftovers from tools to perf 
package
https://issues.apache.org/jira/browse/KAFKA-465
KAFKA-438   Code cleanup in MessageTest
https://issues.apache.org/jira/browse/KAFKA-438
KAFKA-419   Updated PHP client library to support kafka 0.7+
https://issues.apache.org/jira/browse/KAFKA-419
KAFKA-414   Evaluate mmap-based writes for Log implementation
https://issues.apache.org/jira/browse/KAFKA-414
KAFKA-411   Message Error in high cocurrent environment
https://issues.apache.org/jira/browse/KAFKA-411
KAFKA-404   When using chroot path, create chroot on startup if it doesn't exist
https://issues.apache.org/jira/browse/KAFKA-404
KAFKA-399   0.7.1 seems to show less performance than 0.7.0
https://issues.apache.org/jira/browse/KAFKA-399
KAFKA-398   Enhance SocketServer to Enable Sending Requests
https://issues.apache.org/jira/browse/KAFKA-398
KAFKA-397   kafka.common.InvalidMessageSizeException: null
https://issues.apache.org/jira/browse/KAFKA-397
KAFKA-388   Add a highly available consumer co-ordinator to a Kafka cluster
https://issues.apache.org/jira/browse/KAFKA-388
KAFKA-346   Don't call commitOffsets() during rebalance
  

[jira] [Assigned] (KAFKA-753) Kafka broker shuts down while loading segments

2013-02-07 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede reassigned KAFKA-753:
---

Assignee: Sriram Subramanian  (was: Jay Kreps)

> Kafka broker shuts down while loading segments
> --
>
> Key: KAFKA-753
> URL: https://issues.apache.org/jira/browse/KAFKA-753
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Sriram Subramanian
>Priority: Blocker
>  Labels: bugs, p1
>
> 2013/02/07 16:22:15.862 FATAL [KafkaServerStartable] [main] [kafka] []  Fatal 
> error during KafkaServe
> rStable startup. Prepare to shutdown
> java.lang.IllegalArgumentException: Negative position
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:610)
> at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
> at kafka.log.LogSegment.read(LogSegment.scala:91)
> at kafka.log.LogSegment.nextOffset(LogSegment.scala:141)
> at kafka.log.Log.(Log.scala:128)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:115)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at kafka.log.LogManager.loadLogs(LogManager.scala:101)
> at kafka.log.LogManager.(LogManager.scala:62)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-753) Kafka broker shuts down while loading segments

2013-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13573696#comment-13573696
 ] 

Neha Narkhede commented on KAFKA-753:
-

I'm not too sure about this, but one way this can happen is if indexSlotFor() 
returns a negative number (other than -1). It can return a negative number if 
this operation overflows -
 val mid = ceil(hi/2.0 + lo/2.0).toInt

This can lead to a negative physical position that gets passed into 
messageSet.searchFor(offset, mapping.position) into translateOffset() and lead 
to this error.



> Kafka broker shuts down while loading segments
> --
>
> Key: KAFKA-753
> URL: https://issues.apache.org/jira/browse/KAFKA-753
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: bugs, p1
>
> 2013/02/07 16:22:15.862 FATAL [KafkaServerStartable] [main] [kafka] []  Fatal 
> error during KafkaServe
> rStable startup. Prepare to shutdown
> java.lang.IllegalArgumentException: Negative position
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:610)
> at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
> at kafka.log.LogSegment.read(LogSegment.scala:91)
> at kafka.log.LogSegment.nextOffset(LogSegment.scala:141)
> at kafka.log.Log.(Log.scala:128)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:115)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at kafka.log.LogManager.loadLogs(LogManager.scala:101)
> at kafka.log.LogManager.(LogManager.scala:62)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-753) Kafka broker shuts down while loading segments

2013-02-07 Thread Neha Narkhede (JIRA)
Neha Narkhede created KAFKA-753:
---

 Summary: Kafka broker shuts down while loading segments
 Key: KAFKA-753
 URL: https://issues.apache.org/jira/browse/KAFKA-753
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker


2013/02/07 16:22:15.862 FATAL [KafkaServerStartable] [main] [kafka] []  Fatal 
error during KafkaServe
rStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Negative position
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:610)
at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83)
at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
at kafka.log.LogSegment.read(LogSegment.scala:91)
at kafka.log.LogSegment.nextOffset(LogSegment.scala:141)
at kafka.log.Log.(Log.scala:128)
at 
kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:115)
at 
kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at kafka.log.LogManager.loadLogs(LogManager.scala:101)
at kafka.log.LogManager.(LogManager.scala:62)
at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira