compression performance

2013-08-02 Thread Jay Kreps
Chris commented in another thread about the poor compression performance in
0.8, even with snappy.

Indeed if I run the linear log write throughput test on my laptop I see
75MB/sec with no compression and 17MB/sec with snappy.

This is a little surprising as snappy claims 200MB round-trip performance
(compress + uncompress) from java. So what is going on?

Well you may remember I actually filed a bug a while back on all the
inefficient byte copying in the compression path (KAFKA-527). I didn't
think too much of it, other than it is a bit sloppy, since after all
computers are good at copying bytes, right?

Turns out not so much, if you look at a profile of the standalone log test
you see that with no compression 80% of the time goes to FileChannel.write,
which is reasonable since that is what a log does.

But with snappy enabled only 5% goes to writing data, 50% of the time goes
to byte copying and allocation, and only about 22% goes to actual
compression and decompression (with lots of misc stuff in their I haven't
bothered to tally).

If someone was to optimize this code path I think we could take a patch in
0.8.1. It shouldn't be too hard, just using the backing array on the byte
buffer and avoiding all the input streams, output streams, byte array
output streams, and intermediate message blobs.

I summarized this along with how to reproduce the test results here:
https://issues.apache.org/jira/browse/KAFKA-527

-Jay


[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2013-08-02 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728454#comment-13728454
 ] 

Jay Kreps commented on KAFKA-527:
-

I should mention that to run the above test you need to apply the patch from 
KAFKA-615 (v5) to get the compression option and log support in the throughput 
test.

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
> Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-615) Avoid fsync on log segment roll

2013-08-02 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-615:


Attachment: KAFKA-615-v5.patch

Attach updated patch v5. Rebased against trunk and with added support for 
compression in the write throughput test.

> Avoid fsync on log segment roll
> ---
>
> Key: KAFKA-615
> URL: https://issues.apache.org/jira/browse/KAFKA-615
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Neha Narkhede
> Attachments: KAFKA-615-v1.patch, KAFKA-615-v2.patch, 
> KAFKA-615-v3.patch, KAFKA-615-v4.patch, KAFKA-615-v5.patch
>
>
> It still isn't feasible to run without an application level fsync policy. 
> This is a problem as fsync locks the file and tuning such a policy so that 
> the flushes aren't so frequent that seeks reduce throughput, yet not so 
> infrequent that the fsync is writing so much data that there is a noticable 
> jump in latency is very challenging.
> The remaining problem is the way that log recovery works. Our current policy 
> is that if a clean shutdown occurs we do no recovery. If an unclean shutdown 
> occurs we recovery the last segment of all logs. To make this correct we need 
> to ensure that each segment is fsync'd before we create a new segment. Hence 
> the fsync during roll.
> Obviously if the fsync during roll is the only time fsync occurs then it will 
> potentially write out the entire segment which for a 1GB segment at 50mb/sec 
> might take many seconds. The goal of this JIRA is to eliminate this and make 
> it possible to run with no application-level fsyncs at all, depending 
> entirely on replication and background writeback for durability.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2013-08-02 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728452#comment-13728452
 ] 

Jay Kreps commented on KAFKA-527:
-

To test performance checkout trunk and do
./sbt package test:package
./bin/kafka-run-class.sh kafka.TestLinearWriteSpeed --bytes 1007483648 --files 
1 --log --message-size 4096 --size 109600 --compression snappy
./bin/kafka-run-class.sh kafka.TestLinearWriteSpeed --bytes 1007483648 --files 
1 --log --message-size 4096 --size 109600 --compression none

Performance (on my laptop):
none: 75 MB/sec
snappy: 18 MB/sec

Why is this so slow? After all snappy claims a roundtrip performance of around 
200MB/sec...

If you look at the attached hprof traces without compression you see the 
following:
 1 79.87% 79.87%2004 300968 sun.nio.ch.FileDispatcher.write0
   2  5.38% 85.25% 135 300978 kafka.utils.Utils$.crc32
   3  5.06% 90.31% 127 301074 sun.nio.ch.FileChannelImpl.force0
   4  1.79% 92.11%  45 301075 java.nio.MappedByteBuffer.force0
I.e. 80% goes to writing, 5 percent goes to computing crcs, and about 7% goes 
to flushing, then a long tail

If you look at the same trace with compression you expect to see that all the 
time goes to compressing stuff, right? Wrong:
   1 16.44% 16.44% 807 301044 org.xerial.snappy.SnappyNative.arrayCopy
   2 14.81% 31.24% 727 301073 java.util.Arrays.copyOf
   3  9.61% 40.86% 472 301053 org.xerial.snappy.SnappyNative.rawCompress
   4  7.45% 48.31% 366 301084 org.xerial.snappy.SnappyNative.rawUncompress
   5  5.60% 53.91% 275 301063 java.io.ByteArrayOutputStream.
   6  5.13% 59.04% 252 301090 sun.nio.ch.FileDispatcher.write0
   7  4.32% 63.36% 212 301074 java.nio.HeapByteBuffer.
   8  3.97% 67.33% 195 301049 java.util.Arrays.copyOf
   9  3.83% 71.16% 188 301070 org.xerial.snappy.SnappyNative.arrayCopy
  10  3.22% 74.38% 158 301068 org.xerial.snappy.SnappyNative.rawUncompress
  11  2.10% 76.48% 103 301065 org.xerial.snappy.SnappyNative.arrayCopy
  12  1.91% 78.39%  94 301089 java.nio.HeapByteBuffer.

If you tally this up you see that about 50% of the time is going to array 
copying and allocation and a mere 22% going to compression with the actual cost 
of the write knocked down to 5%.

Pretty sad.

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
> Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2013-08-02 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-527:


Attachment: java.hprof.snappy.text

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
> Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2013-08-02 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-527:


Attachment: java.hprof.no-compression.txt

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
> Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2013-08-02 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-527:


Description: 
The data path for compressing or decompressing messages is extremely 
inefficient. We do something like 7 (?) complete copies of the data, often for 
simple things like adding a 4 byte size to the front. I am not sure how this 
went by unnoticed.

This is likely the root cause of the performance issues we saw in doing bulk 
recompression of data in mirror maker.

The mismatch between the InputStream and OutputStream interfaces and the 
Message/MessageSet interfaces which are based on byte buffers is the cause of 
many of these.



  was:
The data path for compressing or decompressing messages is extremely 
inefficient. We do something like 7 (?) complete copies of the data, often for 
simple things like adding a 4 byte size to the front. I am not sure how this 
went by unnoticed.

This is likely the root cause of the performance issues we saw in doing bulk 
recompression of data in mirror maker.

The mismatch between the InputStream and OutputStream interfaces and the 
Message/MessageSet interfaces which are based on byte buffers is the cause of 
many of these.

I believe the right thing to do is to rework the compression code so that it 
doesn't use the Stream interface. Snappy supports ByteBuffers directly. GZIP in 
java doesn't seem to, but I think GZIP is the wrong thing to be using. If I 
understand correctly GZIP = DEFLATE + HEADER + FOOTER. The header contains 
things like a version and checksum. Since we already record the compression 
type, using GZIP is redundant, and we should just be using DEFLATE which has 
direct support for byte arrays. With this change I think it should be possible 
to optimize the compression down to eliminate all copying in the common case.




> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Client improvement discussion

2013-08-02 Thread Jay Kreps
Cool. With respect to compression performance, we definitely see the same
thing, no debate.

Of course if you want to just compress the message payloads you can do that
now without needing much help from kafka--just pass in the compressed data.
Whether it not it will do much depends on the size of the message body--for
small messages you basically need batch compression, but for large messages
just compressing the body is fine. Our extra effort was to get the better
compression ratio of compressed messages.

What I was saying about snappy performance is that I think it may be our
our inefficiency in the compression code-path rather than the underlying
slowness of snappy. For example on this page
  https://github.com/dain/snappy
The compression performance they list for jni (the library we use) tends to
be around 200MB per core-second, with decompression around 1GB per
core-second. So on a modern machine with umpteen cores that should not be a
bottleneck, right? I don't know this to be true but I am wondering if the
the underlying bottleneck is the compression algorithm or our inefficient
code. If you look at kafka.message.ByteBufferMessageSet.{create,
decompress, and assignOffsets} it is pretty inefficient. I did a round of
improvement there but we are still recopying stuff over and over and
creating zillions of little buffers and objects. It is a little tricky to
clean up but probably just a 1-2 day project.

I would rather figure out that it is really the compression that is the
root cause rather than just our inefficiency before we do anything too
drastic design wise. If this is really killing you guys, and if that turns
out to be the cause, we would definitely take a patch to optimize that path
now.

-Jay




On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue  wrote:

> Thanks for the responses. Additional follow-up inline.
>
>
> On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps  wrote:
>
> > Great comments, answers inline!
> >
> > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue  wrote:
> >
> > > These sounds like great steps. A couple of votes and questions:
> > >
> > > 1.  Moving serialization out and basing it all off of byte[] for key
> and
> > > payload makes sense. Echoing a response below, we've ended up doing
> that
> > in
> > > some cases anyway, and the others do a trivial transform to bytes with
> an
> > > Encoder.
> > >
> >
> > Cool.
> >
> >
> > > 2. On the single producer thread, we're actually suffering a bit from
> > this
> > > in 0.8, but it's mostly because compression and the blocking send
> happen
> > on
> > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > side-effect
> > > was that compression and the blocking could "go wide", at least to the
> > > number of brokers. If compression is moved out and the sends are now
> > > non-blocking then this sounds like a nice improvement.
> > >
> >
> > I think even in 0.7 there was only one thread, right?
> >
> >
> I believe it was actually 1 per broker. Producer.scala iterates the brokers
> and adds a new producer for each. The ProducerPool.addProducer() method
> adds a new AsyncProducer instance for the broker (assuming async mode), and
> each AsyncProducer creates and starts its own ProducerSendThread.
>
> In either case, going to multiplexed I/O and not having the compression on
> this thread probably solves any issue there.
>
>
>
> >
> > > 3. The wiki talks about static partition assignment for consumers. Just
> > > adding a vote for that as we're currently working through how to do
> that
> > > ourselves with the 0.8 consumer.
> > >
> >
> > Cool, yeah currently you must use the simple consumer to get that which
> is
> > a pain.
> >
> >
> > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > buffering you've described. If I'm following correctly you've said that
> > > rather than queueing objects you'd end up doing in-place writes to the
> > > pre-allocated ByteBuffer. Presumably this means the compression has
> > already
> > > happened on the user thread. But if there's no batching/buffering
> except
> > in
> > > the ByteBuffer, is there somewhere that multiple messages will be
> > > compressed together (since it should result in better compression)?
> Maybe
> > > there's still batching before this and I read too much into it?
> > >
> >
> > I'm not 100% sure, but I believe the compression can still be done
> inline.
> > The compression algorithm will buffer a bit, of course. What we currently
> > do though is write out the full data uncompressed and then compress it.
> > This is pretty inefficient. Basically we are using Java's OutputStream
> apis
> > for compression but we need to be using the lower-level array oriented
> > algorithms like (Deflater). I haven't tried this but my assumption is
> that
> > we can compress the messages as they arrive into the destination buffer
> > instead of the current approach.
> >
>
>
> Right, was starting to think you may be looking at a way of doing the
> compression increme

[jira] [Comment Edited] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Swapnil Ghike (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728358#comment-13728358
 ] 

Swapnil Ghike edited comment on KAFKA-992 at 8/3/13 1:39 AM:
-

Also the while loop should be fixed, the first sleep will lead to return. 

  was (Author: swapnilghike):
Also the while loop should be fixed, the first sleep will lead to return. 
Also I would use break rather than return.
  
> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Swapnil Ghike (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728358#comment-13728358
 ] 

Swapnil Ghike commented on KAFKA-992:
-

Also the while loop should be fixed, the first sleep will lead to return. Also 
I would use break rather than return.

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Swapnil Ghike (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728354#comment-13728354
 ] 

Swapnil Ghike commented on KAFKA-992:
-

Makes sense. Just one comment, you can use the session timeout from 
KafkaConfig, it will give you the value that is being used at runtime.


> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-998) Producer should not retry on non-recoverable error codes

2013-08-02 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-998:


 Summary: Producer should not retry on non-recoverable error codes
 Key: KAFKA-998
 URL: https://issues.apache.org/jira/browse/KAFKA-998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy


Based on a discussion with Guozhang. The producer currently retries on all 
error codes (including messagesizetoolarge which is pointless to retry on). 
This can slow down the producer unnecessarily.

If at all we want to retry on that error code we would need to retry with a 
smaller batch size, but that's a separate discussion.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Client improvement discussion

2013-08-02 Thread Chris Hogue
Thanks for the responses. Additional follow-up inline.


On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps  wrote:

> Great comments, answers inline!
>
> On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue  wrote:
>
> > These sounds like great steps. A couple of votes and questions:
> >
> > 1.  Moving serialization out and basing it all off of byte[] for key and
> > payload makes sense. Echoing a response below, we've ended up doing that
> in
> > some cases anyway, and the others do a trivial transform to bytes with an
> > Encoder.
> >
>
> Cool.
>
>
> > 2. On the single producer thread, we're actually suffering a bit from
> this
> > in 0.8, but it's mostly because compression and the blocking send happen
> on
> > this thread. In 0.7 since there was a thread-per-broker, a nice
> side-effect
> > was that compression and the blocking could "go wide", at least to the
> > number of brokers. If compression is moved out and the sends are now
> > non-blocking then this sounds like a nice improvement.
> >
>
> I think even in 0.7 there was only one thread, right?
>
>
I believe it was actually 1 per broker. Producer.scala iterates the brokers
and adds a new producer for each. The ProducerPool.addProducer() method
adds a new AsyncProducer instance for the broker (assuming async mode), and
each AsyncProducer creates and starts its own ProducerSendThread.

In either case, going to multiplexed I/O and not having the compression on
this thread probably solves any issue there.



>
> > 3. The wiki talks about static partition assignment for consumers. Just
> > adding a vote for that as we're currently working through how to do that
> > ourselves with the 0.8 consumer.
> >
>
> Cool, yeah currently you must use the simple consumer to get that which is
> a pain.
>
>
> > 4. I'm curious how compression would interact with the new ByteBuffer
> > buffering you've described. If I'm following correctly you've said that
> > rather than queueing objects you'd end up doing in-place writes to the
> > pre-allocated ByteBuffer. Presumably this means the compression has
> already
> > happened on the user thread. But if there's no batching/buffering except
> in
> > the ByteBuffer, is there somewhere that multiple messages will be
> > compressed together (since it should result in better compression)? Maybe
> > there's still batching before this and I read too much into it?
> >
>
> I'm not 100% sure, but I believe the compression can still be done inline.
> The compression algorithm will buffer a bit, of course. What we currently
> do though is write out the full data uncompressed and then compress it.
> This is pretty inefficient. Basically we are using Java's OutputStream apis
> for compression but we need to be using the lower-level array oriented
> algorithms like (Deflater). I haven't tried this but my assumption is that
> we can compress the messages as they arrive into the destination buffer
> instead of the current approach.
>


Right, was starting to think you may be looking at a way of doing the
compression incrementally as they come in. Sounds like what you're pursuing.



>
>
> > 5. I don't know if this is quite the right place to discuss it, but since
> > the producer has some involvement I'll throw it out there. The
> un-compress,
> > assign offsets, re-compress that happens on the broker with the built-in
> > compression API is a significant bottleneck that we're really trying to
> > avoid. As noted in another thread, we saw a throughput increase on the
> > order of 3x when we pre-batched and compressed the payloads before
> sending
> > it to the producer with 0.8.
> >
>
> Yes, it is a bummer. We think ultimately this does make sense though, for
> two reasons beyond offsets:
> 1. You have to validate the integrity of the data the client has sent to
> you or else one bad or buggy client can screw up all consumers.
> 2. The compression of the log should not be tied to the compression used by
> individual producers. We haven't made this change yet, but it is an easy
> one. The problem today is that if your producers send a variety of
> compression types your consumers need to handle the union of all types and
> you have no guarantee over what types producers may send in the future.
> Instead we think these should be decoupled. The topic should have a
> compression type property and that should be totally decoupled from the
> compression type the producer uses. In many cases there is no real need for
> the producer to use compression at all as the real thing you want to
> optimize is later inter-datacenter transfers no the network send to the
> local broker so the producer can just send uncompressed and have the broker
> control the compression type.
>
> The performance really has two causes though:
> 1. GZIP is super slow, especially java's implementation. But snappy, for
> example, is actually quite fast. We should be able to do snappy at network
> speeds according to the perf data I have seen, but...
> 2. ...our current compression code is k

[jira] [Updated] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-992:


Attachment: KAFKA-992.v3.patch

Thanks for the comments Neha.

1,2,5: Done

3. 3.1 and 3.2/3 are distincted since for 3.1 the 
createEphemeralPathExpectConflict would not throw the ZkNodeExistsException.

4. It is not possible to get the session timeout from zkClient, so I use the 
default value (6000ms).

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-915.



> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-915.
--

Resolution: Fixed

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728218#comment-13728218
 ] 

Neha Narkhede edited comment on KAFKA-992 at 8/2/13 11:15 PM:
--

Thanks for patch v2, Guozhang. Few review suggestions -

1. How about keeping the unix timestamp as is. All we have to make sure is that 
it is the equal to what was written. I'm not sure there is an advantage to 
converting it to some date format. 
2. Typo => ephermeral
3. The following log statement is not completely correct -

info("I wrote this conflicted ephermeral node a while back in a 
different session, "
  + "hence I will backoff for this node to be deleted by 
Zookeeper after session timeout and retry")

The reason is because there are 3 cases when the broker might get NodeExists 
and the ephemeral node will have the same host and port -
3.1 It ran into one of the recoverable zookeeper errors while creating the 
ephemeral nodes, in which case ZkClient retried the operation under the covers, 
and it got a NodeExists error on the 2nd retry. In this case, the timestamp 
will be useful as it will match what was written and we do not need to retry.
3.2 It hit the zookeeper non-atomic session expiration problem. In this case, 
the timestamp will not match and we just have to retry.
3.3 The server was killed and restarted within the session timeout. In this 
case, it is useful to back off for session timeout and retry ephemeral node 
creation. 

It will be useful from a logging perspective if we can distinguish between 
these 3.1 & 3.2/3 cases and retry accordingly. Another way to look at this is 
to not store the timestamp and just retry on any NodeExists as that has to go 
through at some point, but we will not get meaningful logging which is not 
ideal.

4. Regarding the backoff time, I think it is better to backoff for the session 
timeout

5. Regarding the case where the broker host and port do not match -

throw new RuntimeException("A broker is already registered on 
the path " + brokerIdPath
  + ". This probably indicates that you either have configured 
a brokerid that is already in use, or "
  + "else you have shutdown this broker and restarted it faster 
than the zookeeper "
  + "timeout so it appears to be re-registering.")

The else part of this statement is incorrect since if you shutdown and 
restarted the same broker, the broker host and port should in fact match. We 
should fix the exception message to reflect that another
 broker [host, port] is registered under that id.


  was (Author: nehanarkhede):
Thanks for patch v2, Guozhang. Few review suggestions -

1. How about keeping the unix timestamp as is. All we have to make sure is that 
it is the equal to what was written. I'm not sure there is an advantage to 
converting it to some date format. 
2. Typo => ephermeral
3. The following log statement is not completely correct -

info("I wrote this conflicted ephermeral node a while back in a 
different session, "
  + "hence I will backoff for this node to be deleted by 
Zookeeper after session timeout and retry")

The reason is because there are 2 cases when the broker might get NodeExists 
and the ephemeral node will have the same host and port -
3.1 It ran into one of the recoverable zookeeper errors while creating the 
ephemeral nodes, in which case ZkClient retried the operation under the covers, 
and it got a NodeExists error on the 2nd retry. In this case, the timestamp 
will be useful as it will match what was written and we do not need to retry.
3.2 It hit the zookeeper non-atomic session expiration problem. In this case, 
the timestamp will not match and we just have to retry.
3.3 The server was killed and restarted within the session timeout. In this 
case, it is useful to back off for session timeout and retry ephemeral node 
creation. 

It will be useful from a logging perspective if we can distinguish between 
these 3.1 & 3.2/3 cases and retry accordingly. Another way to look at this is 
to not store the timestamp and just retry on any NodeExists as that has to go 
through at some point, but we will not get meaningful logging which is not 
ideal.

4. Regarding the backoff time, I think it is better to backoff for the session 
timeout

5. Regarding the case where the broker host and port do not match -

throw new RuntimeException("A broker is already registered on 
the path " + brokerIdPath
  + ". This probably indicates that you either have configured 
a brokerid that is already in use, or "
  + "else you have shutdown this broker and restarted it faster 
than the zookeeper "
  + "timeout so it appears to be re-registering.")

The else part of this statement is incor

[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728247#comment-13728247
 ] 

Joel Koshy commented on KAFKA-915:
--

+1 on the patch. I actually could not reproduce the other failures, so I'll 
check this in.



_test_case_name  :  testcase_5001
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  false
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  500
 Unique messages from producer on [test_1]  :  500
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5002
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5003
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  2200
 Unique messages from producer on [test_1]  :  2200
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5004
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5005
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  2
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  1400
 Unique messages from consumer on [test_2]  :  1400
 Unique messages from producer on [test_1]  :  1400
 Unique messages from producer on [test_2]  :  1400
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for data matched on topic [test_2]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED




> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728230#comment-13728230
 ] 

Guozhang Wang commented on KAFKA-992:
-

Swapnil, we also considered this option. The problem is that zkClient does not 
expose such kind of information. Hence we came out with the timestamp approach.

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728221#comment-13728221
 ] 

Neha Narkhede commented on KAFKA-992:
-

Swapnil,

- You are right in observing that zookeeper stores the session id as part of 
the znode. However, when a session is established, we don't have access to the 
session id through ZkClient. So even though session id comparison is the best 
way to fix the bug, we can't do that.
- There are a lot of things that will go wrong if zookeeper is not able to 
create or expire ephemeral nodes. In such cases, Kafka server will backoff and 
retry registering, the controller will trigger leader elections repeatedly. So 
we will know this by through the LeaderElectionRate and 
UnderReplicatedPartitionCount metrics. 

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728218#comment-13728218
 ] 

Neha Narkhede commented on KAFKA-992:
-

Thanks for patch v2, Guozhang. Few review suggestions -

1. How about keeping the unix timestamp as is. All we have to make sure is that 
it is the equal to what was written. I'm not sure there is an advantage to 
converting it to some date format. 
2. Typo => ephermeral
3. The following log statement is not completely correct -

info("I wrote this conflicted ephermeral node a while back in a 
different session, "
  + "hence I will backoff for this node to be deleted by 
Zookeeper after session timeout and retry")

The reason is because there are 2 cases when the broker might get NodeExists 
and the ephemeral node will have the same host and port -
3.1 It ran into one of the recoverable zookeeper errors while creating the 
ephemeral nodes, in which case ZkClient retried the operation under the covers, 
and it got a NodeExists error on the 2nd retry. In this case, the timestamp 
will be useful as it will match what was written and we do not need to retry.
3.2 It hit the zookeeper non-atomic session expiration problem. In this case, 
the timestamp will not match and we just have to retry.
3.3 The server was killed and restarted within the session timeout. In this 
case, it is useful to back off for session timeout and retry ephemeral node 
creation. 

It will be useful from a logging perspective if we can distinguish between 
these 3.1 & 3.2/3 cases and retry accordingly. Another way to look at this is 
to not store the timestamp and just retry on any NodeExists as that has to go 
through at some point, but we will not get meaningful logging which is not 
ideal.

4. Regarding the backoff time, I think it is better to backoff for the session 
timeout

5. Regarding the case where the broker host and port do not match -

throw new RuntimeException("A broker is already registered on 
the path " + brokerIdPath
  + ". This probably indicates that you either have configured 
a brokerid that is already in use, or "
  + "else you have shutdown this broker and restarted it faster 
than the zookeeper "
  + "timeout so it appears to be re-registering.")

The else part of this statement is incorrect since if you shutdown and 
restarted the same broker, the broker host and port should in fact match. We 
should fix the exception message to reflect that another
 broker [host, port] is registered under that id.


> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A20

[jira] [Updated] (KAFKA-649) Cleanup log4j logging

2013-08-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-649:


Attachment: KAFKA-649.v6.patch

Thanks for the comments. 

50,51,52: Done.

> Cleanup log4j logging
> -
>
> Key: KAFKA-649
> URL: https://issues.apache.org/jira/browse/KAFKA-649
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-649_extra.patch, kafka-649.patch, 
> KAFKA-649.v3.patch, KAFKA-649.v4.patch, KAFKA-649.v5.patch, KAFKA-649.v6.patch
>
>
> Review the logs and do the following:
> 1. Fix confusing or duplicative messages
> 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
> It would also be nice to add a log4j logger for the request logging (i.e. the 
> access log) and another for the controller state change log, since these 
> really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-955:


Attachment: KAFKA-955.v3.patch

Add the testMessageSizeTooLargeWithAckZero to syncProducerTest, which:

1. First send a large message that will cause the MessageSizeTooLarge 
exception, and hence close the socket. But this message will be silently 
dropped and lost.

2. Then send another large message, but just to make sure its size exceeds the 
buffer size so the socket buffer will be flushed immediately; this send should 
fail since the socket has been closed.

> After a leader change, messages sent with ack=0 are lost
> 
>
> Key: KAFKA-955
> URL: https://issues.apache.org/jira/browse/KAFKA-955
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Guozhang Wang
> Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
> KAFKA-955.v2.patch, KAFKA-955.v3.patch
>
>
> If the leader changes for a partition, and a producer is sending messages 
> with ack=0, then messages will be lost, since the producer has no active way 
> of knowing that the leader has changed, until it's next metadata refresh 
> update.
> The broker receiving the message, which is no longer the leader, logs a 
> message like this:
> Produce request with correlation id 7136261 from client  on partition 
> [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
> broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an 
> immediate leader change.
> A possible solution to this would be for a broker which receives a message, 
> for a topic that it is no longer the leader for (and if the ack level is 0), 
> then the broker could just silently forward the message over to the current 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-984) Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same

2013-08-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728209#comment-13728209
 ] 

Joel Koshy commented on KAFKA-984:
--

Thanks for the patch - this will help a *lot* especially for mirroring.
However, I share Jun's concern about making such a non-trivial change to
0.8. In any event, here are some comments on
scala.kafka.consumer.ZookeeperConsumerConnector

- We should definitely abstract out the common code - syncedPartialRebalance
  and WildcardStreamsHandler. I think with some thought we can refactor it
  or we end up with copies of relatively complex code.
- The filters on lines 432/433 will not have any effect (I think) since the
  maps are immutable. You should probably apply the filter on assignments on
  lines 428/429. So metadata for other topics will be fetched unnecessarily,
  and fetchers for other topics may be stopped unnecessarily.
- Also, there are topic variables inside the method that shadow the
  parameter which makes it harder to determine which variable is in effect
  for which scope.
- Logging can be improved/made more concise: few typos and inconsistencies
  in capitalization.
- Why do this only if # added topics == 1? Can accept a list of topics to
  rebalance for instead right? I do see your note on Sriram's comments, but
  I don't see it in this jira. Can you include those comments?



> Avoid a full rebalance in cases when a new topic is discovered but 
> container/broker set stay the same
> -
>
> Key: KAFKA-984
> URL: https://issues.apache.org/jira/browse/KAFKA-984
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.8
>
> Attachments: KAFKA-984.v1.patch, KAFKA-984.v2.patch, 
> KAFKA-984.v2.patch
>
>
> Currently a full rebalance will be triggered on high level consumers even 
> when just a new topic is added to ZK. Better avoid this behavior but only 
> rebalance on this newly added topic.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Client improvement discussion

2013-08-02 Thread Jay Kreps
Great comments, answers inline!

On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue  wrote:

> These sounds like great steps. A couple of votes and questions:
>
> 1.  Moving serialization out and basing it all off of byte[] for key and
> payload makes sense. Echoing a response below, we've ended up doing that in
> some cases anyway, and the others do a trivial transform to bytes with an
> Encoder.
>

Cool.


> 2. On the single producer thread, we're actually suffering a bit from this
> in 0.8, but it's mostly because compression and the blocking send happen on
> this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
> was that compression and the blocking could "go wide", at least to the
> number of brokers. If compression is moved out and the sends are now
> non-blocking then this sounds like a nice improvement.
>

I think even in 0.7 there was only one thread, right?


> 3. The wiki talks about static partition assignment for consumers. Just
> adding a vote for that as we're currently working through how to do that
> ourselves with the 0.8 consumer.
>

Cool, yeah currently you must use the simple consumer to get that which is
a pain.


> 4. I'm curious how compression would interact with the new ByteBuffer
> buffering you've described. If I'm following correctly you've said that
> rather than queueing objects you'd end up doing in-place writes to the
> pre-allocated ByteBuffer. Presumably this means the compression has already
> happened on the user thread. But if there's no batching/buffering except in
> the ByteBuffer, is there somewhere that multiple messages will be
> compressed together (since it should result in better compression)? Maybe
> there's still batching before this and I read too much into it?
>

I'm not 100% sure, but I believe the compression can still be done inline.
The compression algorithm will buffer a bit, of course. What we currently
do though is write out the full data uncompressed and then compress it.
This is pretty inefficient. Basically we are using Java's OutputStream apis
for compression but we need to be using the lower-level array oriented
algorithms like (Deflater). I haven't tried this but my assumption is that
we can compress the messages as they arrive into the destination buffer
instead of the current approach.


> 5. I don't know if this is quite the right place to discuss it, but since
> the producer has some involvement I'll throw it out there. The un-compress,
> assign offsets, re-compress that happens on the broker with the built-in
> compression API is a significant bottleneck that we're really trying to
> avoid. As noted in another thread, we saw a throughput increase on the
> order of 3x when we pre-batched and compressed the payloads before sending
> it to the producer with 0.8.
>

Yes, it is a bummer. We think ultimately this does make sense though, for
two reasons beyond offsets:
1. You have to validate the integrity of the data the client has sent to
you or else one bad or buggy client can screw up all consumers.
2. The compression of the log should not be tied to the compression used by
individual producers. We haven't made this change yet, but it is an easy
one. The problem today is that if your producers send a variety of
compression types your consumers need to handle the union of all types and
you have no guarantee over what types producers may send in the future.
Instead we think these should be decoupled. The topic should have a
compression type property and that should be totally decoupled from the
compression type the producer uses. In many cases there is no real need for
the producer to use compression at all as the real thing you want to
optimize is later inter-datacenter transfers no the network send to the
local broker so the producer can just send uncompressed and have the broker
control the compression type.

The performance really has two causes though:
1. GZIP is super slow, especially java's implementation. But snappy, for
example, is actually quite fast. We should be able to do snappy at network
speeds according to the perf data I have seen, but...
2. ...our current compression code is kind of inefficient due to all the
copying and traversal, due to the reasons cited above.

So in other words I think we can make this a bit better but it probably
won't go away. How do you feel about snappy?


> I've not looked very closely at the wire-protocol, but if there was a way
> for it to support in-place offset assignment even for compressed messages
> it would be a huge win. Short of that we're fine taking the batch/compress
> responsibility into user code, but it would be nice to have a way to do
> that while retaining the built-in partition selection (i.e. semantic
> partitioning) and other functionality of the producer. The new design may
> already be an improvement in this area since it would move some
> responsibility to the user thread.
>

We can't really do this because we are multi-writer so any offset we give
the client 

[jira] [Updated] (KAFKA-989) Race condition shutting down high-level consumer results in spinning background thread

2013-08-02 Thread Phil Hargett (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Hargett updated KAFKA-989:
---

Status: Open  (was: Patch Available)

Not good enough.  Deadlocks because ShutdownableThread.shutdown grabs another 
lock.

> Race condition shutting down high-level consumer results in spinning 
> background thread
> --
>
> Key: KAFKA-989
> URL: https://issues.apache.org/jira/browse/KAFKA-989
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Ubuntu Linux x64
>Reporter: Phil Hargett
> Attachments: KAFKA-989-failed-to-find-leader.patch
>
>
> Running an application that uses the Kafka client under load, can often hit 
> this issue within a few hours.
> High-level consumers come and go over this application's lifecycle, but there 
> are a variety of defenses that ensure each high-level consumer lasts several 
> seconds before being shutdown.  Nevertheless, some race is causing this 
> background thread to continue long after the ZKClient it is using has been 
> disconnected.  Since the thread was spawned by a consumer that has already 
> been shutdown, the application has no way to find this thread and stop it.
> Reported on the users-kafka mailing list 6/25 as "0.8 throwing exception 
> 'Failed to find leader' and high-level consumer fails to make progress". 
> The only remedy is to shutdown the application and restart it.  Externally 
> detecting that this state has occurred is not pleasant: need to grep log for 
> repeated occurrences of the same exception.
> Stack trace:
> Failed to find leader for Set([topic6,0]): java.lang.NullPointerException
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>   at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:438)
>   at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:75)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:63)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Client improvement discussion

2013-08-02 Thread Jay Kreps
I believe there are some open source C++ producer implementations. At
linkedin we have a C++ implementation. We would like to open source this if
there is interest. We would like to eventually include a C++ consumer as
well.

-Jay


On Mon, Jul 29, 2013 at 6:03 AM, Sybrandy, Casey <
casey.sybra...@six3systems.com> wrote:

> In the past there was some discussion about having a C client for non-JVM
> languages.  Is this still planned as well?  Being able to work with Kafka
> from other languages would be a great thing.  Where I work, we interact
> with Kafka via Java and Ruby (producer), so having an official C library
> that can be used from within Ruby would make it easier to have the same
> version of the client in Java and Ruby.
>
> -Original Message-
> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> Sent: Friday, July 26, 2013 3:00 PM
> To: dev@kafka.apache.org; us...@kafka.apache.org
> Subject: Client improvement discussion
>
> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients
> still exist. The new clients will be entirely in a new package so there
> will be no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we
> took a byte[] key and byte[] value and let people serialize stuff
> themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with 

Re: Client improvement discussion

2013-08-02 Thread Chris Hogue
These sounds like great steps. A couple of votes and questions:

1.  Moving serialization out and basing it all off of byte[] for key and
payload makes sense. Echoing a response below, we've ended up doing that in
some cases anyway, and the others do a trivial transform to bytes with an
Encoder.

2. On the single producer thread, we're actually suffering a bit from this
in 0.8, but it's mostly because compression and the blocking send happen on
this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
was that compression and the blocking could "go wide", at least to the
number of brokers. If compression is moved out and the sends are now
non-blocking then this sounds like a nice improvement.

3. The wiki talks about static partition assignment for consumers. Just
adding a vote for that as we're currently working through how to do that
ourselves with the 0.8 consumer.

4. I'm curious how compression would interact with the new ByteBuffer
buffering you've described. If I'm following correctly you've said that
rather than queueing objects you'd end up doing in-place writes to the
pre-allocated ByteBuffer. Presumably this means the compression has already
happened on the user thread. But if there's no batching/buffering except in
the ByteBuffer, is there somewhere that multiple messages will be
compressed together (since it should result in better compression)? Maybe
there's still batching before this and I read too much into it?

5. I don't know if this is quite the right place to discuss it, but since
the producer has some involvement I'll throw it out there. The un-compress,
assign offsets, re-compress that happens on the broker with the built-in
compression API is a significant bottleneck that we're really trying to
avoid. As noted in another thread, we saw a throughput increase on the
order of 3x when we pre-batched and compressed the payloads before sending
it to the producer with 0.8.

I've not looked very closely at the wire-protocol, but if there was a way
for it to support in-place offset assignment even for compressed messages
it would be a huge win. Short of that we're fine taking the batch/compress
responsibility into user code, but it would be nice to have a way to do
that while retaining the built-in partition selection (i.e. semantic
partitioning) and other functionality of the producer. The new design may
already be an improvement in this area since it would move some
responsibility to the user thread.

Not sure if that's clear, but as the interfaces take shape it may be easier
to see how that will work.

-Chris






On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps  wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the

[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727910#comment-13727910
 ] 

Guozhang Wang commented on KAFKA-955:
-

Sorry for the name misleading, I did not shut down the broker but instead send 
a large message to it to trigger the MessageSizeTooLargeException. The name of 
the test should be testSendTooLargeMessageWithAckZero.

I will use SyncProducer instead of Producer in this test, and send a normal 
message to the broker after this message, and expecting it to fail due to 
socket IO exception.

> After a leader change, messages sent with ack=0 are lost
> 
>
> Key: KAFKA-955
> URL: https://issues.apache.org/jira/browse/KAFKA-955
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Guozhang Wang
> Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
> KAFKA-955.v2.patch
>
>
> If the leader changes for a partition, and a producer is sending messages 
> with ack=0, then messages will be lost, since the producer has no active way 
> of knowing that the leader has changed, until it's next metadata refresh 
> update.
> The broker receiving the message, which is no longer the leader, logs a 
> message like this:
> Produce request with correlation id 7136261 from client  on partition 
> [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
> broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an 
> immediate leader change.
> A possible solution to this would be for a broker which receives a message, 
> for a topic that it is no longer the leader for (and if the ack level is 0), 
> then the broker could just silently forward the message over to the current 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-989) Race condition shutting down high-level consumer results in spinning background thread

2013-08-02 Thread Phil Hargett (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727844#comment-13727844
 ] 

Phil Hargett edited comment on KAFKA-989 at 8/2/13 5:33 PM:


Here's the patch: KAFKA-989-failed-to-find-leader.patch.

  was (Author: phargett):
Here's the patch :)
  
> Race condition shutting down high-level consumer results in spinning 
> background thread
> --
>
> Key: KAFKA-989
> URL: https://issues.apache.org/jira/browse/KAFKA-989
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Ubuntu Linux x64
>Reporter: Phil Hargett
> Attachments: KAFKA-989-failed-to-find-leader.patch
>
>
> Running an application that uses the Kafka client under load, can often hit 
> this issue within a few hours.
> High-level consumers come and go over this application's lifecycle, but there 
> are a variety of defenses that ensure each high-level consumer lasts several 
> seconds before being shutdown.  Nevertheless, some race is causing this 
> background thread to continue long after the ZKClient it is using has been 
> disconnected.  Since the thread was spawned by a consumer that has already 
> been shutdown, the application has no way to find this thread and stop it.
> Reported on the users-kafka mailing list 6/25 as "0.8 throwing exception 
> 'Failed to find leader' and high-level consumer fails to make progress". 
> The only remedy is to shutdown the application and restart it.  Externally 
> detecting that this state has occurred is not pleasant: need to grep log for 
> repeated occurrences of the same exception.
> Stack trace:
> Failed to find leader for Set([topic6,0]): java.lang.NullPointerException
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>   at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:438)
>   at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:75)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:63)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-649) Cleanup log4j logging

2013-08-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727846#comment-13727846
 ] 

Jun Rao commented on KAFKA-649:
---

Thanks for patch v5.

50. ConsoleConsumer: There is no need to change the error to fatal since it 
indicates a command line error.

51. KafkaApis: Not sure if we need to make this change. If there is an error in 
getting metadata, the client will get an error code. Logging this in the broker 
may not help and can pollute the log.

52. SimpleConsumer: Just logging the message of the exception is enough. It 
should include the type of the exception in the message.

> Cleanup log4j logging
> -
>
> Key: KAFKA-649
> URL: https://issues.apache.org/jira/browse/KAFKA-649
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-649_extra.patch, kafka-649.patch, 
> KAFKA-649.v3.patch, KAFKA-649.v4.patch, KAFKA-649.v5.patch
>
>
> Review the logs and do the following:
> 1. Fix confusing or duplicative messages
> 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
> It would also be nice to add a log4j logger for the request logging (i.e. the 
> access log) and another for the controller state change log, since these 
> really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-989) Race condition shutting down high-level consumer results in spinning background thread

2013-08-02 Thread Phil Hargett (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Hargett updated KAFKA-989:
---

Attachment: KAFKA-989-failed-to-find-leader.patch

Here's the patch :)

> Race condition shutting down high-level consumer results in spinning 
> background thread
> --
>
> Key: KAFKA-989
> URL: https://issues.apache.org/jira/browse/KAFKA-989
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Ubuntu Linux x64
>Reporter: Phil Hargett
> Attachments: KAFKA-989-failed-to-find-leader.patch
>
>
> Running an application that uses the Kafka client under load, can often hit 
> this issue within a few hours.
> High-level consumers come and go over this application's lifecycle, but there 
> are a variety of defenses that ensure each high-level consumer lasts several 
> seconds before being shutdown.  Nevertheless, some race is causing this 
> background thread to continue long after the ZKClient it is using has been 
> disconnected.  Since the thread was spawned by a consumer that has already 
> been shutdown, the application has no way to find this thread and stop it.
> Reported on the users-kafka mailing list 6/25 as "0.8 throwing exception 
> 'Failed to find leader' and high-level consumer fails to make progress". 
> The only remedy is to shutdown the application and restart it.  Externally 
> detecting that this state has occurred is not pleasant: need to grep log for 
> repeated occurrences of the same exception.
> Stack trace:
> Failed to find leader for Set([topic6,0]): java.lang.NullPointerException
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>   at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:438)
>   at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:75)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:63)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-989) Race condition shutting down high-level consumer results in spinning background thread

2013-08-02 Thread Phil Hargett (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Hargett updated KAFKA-989:
---

Status: Patch Available  (was: Open)

This patch may minimize the issue, as there does seem to be a race between 
startConnections / stopConnections in ConsumerFetcherManager *and* the doWork 
method of the inner LeaderFinderThread class.

It seems that a thread could be started (from startConnections) but shutdown 
could happen (from stopConnections) even before the leader thread actually even 
started to do work.

> Race condition shutting down high-level consumer results in spinning 
> background thread
> --
>
> Key: KAFKA-989
> URL: https://issues.apache.org/jira/browse/KAFKA-989
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Ubuntu Linux x64
>Reporter: Phil Hargett
>
> Running an application that uses the Kafka client under load, can often hit 
> this issue within a few hours.
> High-level consumers come and go over this application's lifecycle, but there 
> are a variety of defenses that ensure each high-level consumer lasts several 
> seconds before being shutdown.  Nevertheless, some race is causing this 
> background thread to continue long after the ZKClient it is using has been 
> disconnected.  Since the thread was spawned by a consumer that has already 
> been shutdown, the application has no way to find this thread and stop it.
> Reported on the users-kafka mailing list 6/25 as "0.8 throwing exception 
> 'Failed to find leader' and high-level consumer fails to make progress". 
> The only remedy is to shutdown the application and restart it.  Externally 
> detecting that this state has occurred is not pleasant: need to grep log for 
> repeated occurrences of the same exception.
> Stack trace:
> Failed to find leader for Set([topic6,0]): java.lang.NullPointerException
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
>   at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>   at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:438)
>   at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:75)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:63)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727824#comment-13727824
 ] 

Jun Rao commented on KAFKA-955:
---

Thanks for patch v2. Some more comments.

20. testSendWithAckZeroDeadBroker(): I am not sure if the unit test does what 
you want. First of all, setup() will always start brokers for each test unless 
you explicitly shut them down. So, in this test, the brokers are not dead. 
Second, the test doesn't really test that the socket is closed after error. I 
suggest that we add a new test in SyncProducerTest. We send a request with 
ack=0 with a large message. After that, we can try to send a new request again 
and we should hit a socket I/O exception. We may have to wait for some time 
between the two requests.

> After a leader change, messages sent with ack=0 are lost
> 
>
> Key: KAFKA-955
> URL: https://issues.apache.org/jira/browse/KAFKA-955
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Guozhang Wang
> Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
> KAFKA-955.v2.patch
>
>
> If the leader changes for a partition, and a producer is sending messages 
> with ack=0, then messages will be lost, since the producer has no active way 
> of knowing that the leader has changed, until it's next metadata refresh 
> update.
> The broker receiving the message, which is no longer the leader, logs a 
> message like this:
> Produce request with correlation id 7136261 from client  on partition 
> [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
> broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an 
> immediate leader change.
> A possible solution to this would be for a broker which receives a message, 
> for a topic that it is no longer the leader for (and if the ack level is 0), 
> then the broker could just silently forward the message over to the current 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-997) Provide a strict verification mode when reading configuration properties

2013-08-02 Thread Sam Meder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Meder updated KAFKA-997:


Attachment: strict-verification-2.patch

> Provide a strict verification mode when reading configuration properties
> 
>
> Key: KAFKA-997
> URL: https://issues.apache.org/jira/browse/KAFKA-997
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Minor
> Fix For: 0.8
>
> Attachments: strict-verification-2.patch
>
>
> This ticket is based on the discussion in KAFKA-943. It introduces a new 
> property that makes the config  system throw an exception when it encounters 
> unrecognized properties. (instead of a simple warn-level log statement). This 
> new property defaults to false.
> Hopefully this will result in fewer instance of out-of-date configuration. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-997) Provide a strict verification mode when reading configuration properties

2013-08-02 Thread Sam Meder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Meder updated KAFKA-997:


Attachment: (was: strict-verification.patch)

> Provide a strict verification mode when reading configuration properties
> 
>
> Key: KAFKA-997
> URL: https://issues.apache.org/jira/browse/KAFKA-997
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Minor
> Fix For: 0.8
>
> Attachments: strict-verification-2.patch
>
>
> This ticket is based on the discussion in KAFKA-943. It introduces a new 
> property that makes the config  system throw an exception when it encounters 
> unrecognized properties. (instead of a simple warn-level log statement). This 
> new property defaults to false.
> Hopefully this will result in fewer instance of out-of-date configuration. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-992:


Description: 
The current behavior of zookeeper for ephemeral nodes is that session 
expiration and ephemeral node deletion is not an atomic operation. 

The side-effect of the above zookeeper behavior in Kafka, for certain corner 
cases, is that ephemeral nodes can be lost even if the session is not expired. 
The sequence of events that can lead to lossy ephemeral nodes is as follows -

1. The session expires on the client, it assumes the ephemeral nodes are 
deleted, so it establishes a new session with zookeeper and tries to re-create 
the ephemeral nodes. 
2. However, when it tries to re-create the ephemeral node,zookeeper throws back 
a NodeExists error code. Now this is legitimate during a session disconnect 
event (since zkclient automatically retries the
operation and raises a NodeExists error). Also by design, Kafka server doesn't 
have multiple zookeeper clients create the same ephemeral node, so Kafka server 
assumes the NodeExists is normal. 
3. However, after a few seconds zookeeper deletes that ephemeral node. So from 
the client's perspective, even though the client has a new valid session, its 
ephemeral node is gone.

This behavior is triggered due to very long fsync operations on the zookeeper 
leader. When the leader wakes up from such a long fsync operation, it has 
several sessions to expire. And the time between the session expiration and the 
ephemeral node deletion is magnified. Between these 2 operations, a zookeeper 
client can issue a ephemeral node creation operation, that could've appeared to 
have succeeded, but the leader later deletes the ephemeral node leading to 
permanent ephemeral node loss from the client's perspective. 

Thread from zookeeper mailing list: 
http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

  was:
There is a potential bug in Zookeeper that when the ZK leader processes a lot 
of session expiration events (this could be due to a long GC or a fsync 
operation, etc), it marks the session as expired but does not delete the 
corresponding ephemeral znode at the same time. 

Meanwhile, a new session event will be fired on the kafka server and the server 
will request the same ephemeral node to be created on handling the new session. 
When it enters the zookeeper processing queue, this operation receives a 
NodeExists error since zookeeper leader has not finished deleting that 
ephemeral znode and still thinks the previous session holds it. Kafka assumes 
that the NodeExists error on ephemeral node creation is ok since that is a 
legitimate condition that happens during session disconnects on zookeeper. 
However, a NodeExists error is only valid if the owner session id also matches 
Kafka server's current zookeeper session id. The bug is that before sending a 
NodeExists error, Zookeeper should check if the ephemeral node in question is 
held by a session that has marked as expired.

   Reporter: Neha Narkhede  (was: Guozhang Wang)

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation,

[jira] [Updated] (KAFKA-997) Provide a strict verification mode when reading configuration properties

2013-08-02 Thread Sam Meder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Meder updated KAFKA-997:


Status: Patch Available  (was: Open)

> Provide a strict verification mode when reading configuration properties
> 
>
> Key: KAFKA-997
> URL: https://issues.apache.org/jira/browse/KAFKA-997
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Minor
> Fix For: 0.8
>
> Attachments: strict-verification.patch
>
>
> This ticket is based on the discussion in KAFKA-943. It introduces a new 
> property that makes the config  system throw an exception when it encounters 
> unrecognized properties. (instead of a simple warn-level log statement). This 
> new property defaults to false.
> Hopefully this will result in fewer instance of out-of-date configuration. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-997) Provide a strict verification mode when reading configuration properties

2013-08-02 Thread Sam Meder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Meder updated KAFKA-997:


Attachment: strict-verification.patch

Add strict verification property and use it in tests and similar situations 
where config is constrained. Also fixed a couple of problems with out of date 
config in tests.

> Provide a strict verification mode when reading configuration properties
> 
>
> Key: KAFKA-997
> URL: https://issues.apache.org/jira/browse/KAFKA-997
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Minor
> Fix For: 0.8
>
> Attachments: strict-verification.patch
>
>
> This ticket is based on the discussion in KAFKA-943. It introduces a new 
> property that makes the config  system throw an exception when it encounters 
> unrecognized properties. (instead of a simple warn-level log statement). This 
> new property defaults to false.
> Hopefully this will result in fewer instance of out-of-date configuration. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-997) Provide a strict verification mode when reading configuration properties

2013-08-02 Thread Sam Meder (JIRA)
Sam Meder created KAFKA-997:
---

 Summary: Provide a strict verification mode when reading 
configuration properties
 Key: KAFKA-997
 URL: https://issues.apache.org/jira/browse/KAFKA-997
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 0.8
Reporter: Sam Meder
Priority: Minor
 Fix For: 0.8


This ticket is based on the discussion in KAFKA-943. It introduces a new 
property that makes the config  system throw an exception when it encounters 
unrecognized properties. (instead of a simple warn-level log statement). This 
new property defaults to false.

Hopefully this will result in fewer instance of out-of-date configuration. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-943) Move all configuration key string to constants

2013-08-02 Thread Sam Meder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Meder updated KAFKA-943:


Resolution: Won't Fix
Status: Resolved  (was: Patch Available)

Resolving this issue since the patch wasn't accepted. I'll open a new issue for 
the strict validation patch

> Move all configuration key string to constants
> --
>
> Key: KAFKA-943
> URL: https://issues.apache.org/jira/browse/KAFKA-943
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8
>Reporter: Sam Meder
> Attachments: configConstants.patch
>
>
> The current code base has configuration key strings duplicated all over the 
> place. They show up in the actual *Config classes, a lot of tests, command 
> line utilities and other examples. This makes changes hard and error prone. 
> DRY...
> The attached patch moves these configuration keys to constants and replaces 
> their usage with a reference to the constant. It also cleans up a few old 
> properties and a few misconfigured tests. I've admittedly not written a whole 
> lot of Scala, so there may be some improvements that can be made, in 
> particular I am not sure I chose the best strategy for keys needed by the 
> SyncProducerConfigShared trait (or traits in general).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Swapnil Ghike (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727438#comment-13727438
 ] 

Swapnil Ghike edited comment on KAFKA-992 at 8/2/13 7:11 AM:
-

- I am not completely clear on why timestamp is required to be stored in 
zookeeper along with other broker info. If I am not wrong, znode stores the 
session that created it in the field ephemeralOwner. There should be a way to 
get its value when we read broker znode info.
- Perhaps we should have fixed number of retries. If zookeeper cannot delete 
the znode after session expiration after sufficient amount of time, we would 
probably like to know that we are dealing with a buggy zookeeper setup.

Then this should suffice:

catch ZkNodeExistsException =>
for (numRetries) {
 if (broker.host == host && broker.port == port && sessionId == lastSessionId) {
   Thread.sleep(..)
 } else {
   throw new RuntimeException(...)
 }
}

  was (Author: swapnilghike):
- I think I am not completely clear why timestamp is required to be stored 
in zookeeper along with other broker info. If I am not wrong, ephemeralOwner = 
0x13ff5a4758c4a05 is the session Id. Is there a way to get it from zookeeper 
when we read the broker znode info? 
- Perhaps we should have fixed number of retries. If zookeeper cannot delete 
the znode after session expiration after sufficient amount of time, we would 
probably like to know that we are dealing with a buggy zookeeper setup.

Then this should suffice:

catch ZkNodeExistsException =>
for (numRetries) {
 if (broker.host == host && broker.port == port && sessionId == lastSessionId) {
   Thread.sleep(..)
 } else {
   throw new RuntimeException(...)
 }
}
  
> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> There is a potential bug in Zookeeper that when the ZK leader processes a lot 
> of session expiration events (this could be due to a long GC or a fsync 
> operation, etc), it marks the session as expired but does not delete the 
> corresponding ephemeral znode at the same time. 
> Meanwhile, a new session event will be fired on the kafka server and the 
> server will request the same ephemeral node to be created on handling the new 
> session. When it enters the zookeeper processing queue, this operation 
> receives a NodeExists error since zookeeper leader has not finished deleting 
> that ephemeral znode and still thinks the previous session holds it. Kafka 
> assumes that the NodeExists error on ephemeral node creation is ok since that 
> is a legitimate condition that happens during session disconnects on 
> zookeeper. However, a NodeExists error is only valid if the owner session id 
> also matches Kafka server's current zookeeper session id. The bug is that 
> before sending a NodeExists error, Zookeeper should check if the ephemeral 
> node in question is held by a session that has marked as expired.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-02 Thread Swapnil Ghike (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727438#comment-13727438
 ] 

Swapnil Ghike commented on KAFKA-992:
-

- I think I am not completely clear why timestamp is required to be stored in 
zookeeper along with other broker info. If I am not wrong, ephemeralOwner = 
0x13ff5a4758c4a05 is the session Id. Is there a way to get it from zookeeper 
when we read the broker znode info? 
- Perhaps we should have fixed number of retries. If zookeeper cannot delete 
the znode after session expiration after sufficient amount of time, we would 
probably like to know that we are dealing with a buggy zookeeper setup.

Then this should suffice:

catch ZkNodeExistsException =>
for (numRetries) {
 if (broker.host == host && broker.port == port && sessionId == lastSessionId) {
   Thread.sleep(..)
 } else {
   throw new RuntimeException(...)
 }
}

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch
>
>
> There is a potential bug in Zookeeper that when the ZK leader processes a lot 
> of session expiration events (this could be due to a long GC or a fsync 
> operation, etc), it marks the session as expired but does not delete the 
> corresponding ephemeral znode at the same time. 
> Meanwhile, a new session event will be fired on the kafka server and the 
> server will request the same ephemeral node to be created on handling the new 
> session. When it enters the zookeeper processing queue, this operation 
> receives a NodeExists error since zookeeper leader has not finished deleting 
> that ephemeral znode and still thinks the previous session holds it. Kafka 
> assumes that the NodeExists error on ephemeral node creation is ok since that 
> is a legitimate condition that happens during session disconnects on 
> zookeeper. However, a NodeExists error is only valid if the owner session id 
> also matches Kafka server's current zookeeper session id. The bug is that 
> before sending a NodeExists error, Zookeeper should check if the ephemeral 
> node in question is held by a session that has marked as expired.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira