Re: Perf testing flush() - issues found

2015-04-28 Thread Roshan Naik

- Event size = 1kB.
- broker and client running on different machines (identical config, 32
cores, 256GB ram, 6x 1500rpm disk, 10gigEhernet)
- Don't readily have number for old batch sync API for the same params.
But can get it soon. However .. does it matter ?



-roshan






On 4/28/15 6:57 PM, "Joel Koshy"  wrote:

>- What is the record size?
>- Is this a local setup? i.e., producer/broker running local?
>- Any overrides apart from batch size? E.g., linger time.
>- Can you establish a baseline - with the old producer's sync-send?
>
>Thanks,
>
>Joel
>
>On Wed, Apr 29, 2015 at 12:58:43AM +, Roshan Naik wrote:
>> Based on recent suggestion by Joel, I am experimenting with using
>>flush() to simulate  batched-sync behavior.
>> The essence of my  single threaded producer code is :
>> 
>> for (int i = 0; i < numRecords;) {
>> // 1- Send a batch
>> for(int batchCounter=0; batchCounter> Future f =  producer.send(record, null);
>> futureList.add(f);
>> i++;
>> }
>> // 2- Flush after sending batch
>> producer.flush();
>> 
>> // 3- Ensure all msgs were send
>> for( Future f : futureList) {
>> f.get();
>> }
>> }
>> 
>> There are actually two batch size in play here. One is the number of
>>messages between every flush() call made by the client. The other is the
>> batch.size  setting which impacts the batching internally done by the
>>underlying Async api.
>> 
>> Intuitively  .. we either want to
>>   A) Set both batch sizes to be Equal, OR
>>   B) Set the underlying batch.size to a sufficiently large number so as
>>to effectively disable internal batch management
>> 
>> 
>> Below numbers are in MB/s.  The 'Batch' column indicate the number of
>>events between each explicit client flush()
>> Setup is 1-node broker and acks=1.
>> 
>> 1 partition
>> Batch=4kBatch=8kBatch=16k
>> Equal batchSizes (a)16  32  52
>> large batch.Size (b)140 123 124
>> 
>> 4 partitions
>> Batch=4kBatch=8kBatch=16k
>> Equal batchSz (a)   35  61  82
>> large batch.size (b)7   7   7
>> 8 partitions
>> Batch=4kBatch=8kBatch=16k
>> Equal batchSz (a)   49  70  99
>> large batch.size (b)7   8   7
>> 
>> 
>> There are two issues noticeable in these number:
>> 1 - Case A is much faster than case B for 4 and 8 partitions.
>> 2 - Single partition mode outperforms all others and here case B is
>>faster than case A.
>> 
>> 
>> 
>> 
>> Side Note: I used the  client APIs  from the trunk while the broker is
>>running 0.8.2 (I don't think it matters, but nevertheless wanted to
>>point out)
>> 
>



Re: Perf testing flush() - issues found

2015-04-28 Thread Joel Koshy
- What is the record size?
- Is this a local setup? i.e., producer/broker running local?
- Any overrides apart from batch size? E.g., linger time.
- Can you establish a baseline - with the old producer's sync-send?

Thanks,

Joel

On Wed, Apr 29, 2015 at 12:58:43AM +, Roshan Naik wrote:
> Based on recent suggestion by Joel, I am experimenting with using flush() to 
> simulate  batched-sync behavior.
> The essence of my  single threaded producer code is :
> 
> for (int i = 0; i < numRecords;) {
> // 1- Send a batch
> for(int batchCounter=0; batchCounter Future f =  producer.send(record, null);
> futureList.add(f);
> i++;
> }
> // 2- Flush after sending batch
> producer.flush();
> 
> // 3- Ensure all msgs were send
> for( Future f : futureList) {
> f.get();
> }
> }
> 
> There are actually two batch size in play here. One is the number of messages 
> between every flush() call made by the client. The other is the  batch.size  
> setting which impacts the batching internally done by the underlying Async 
> api.
> 
> Intuitively  .. we either want to
>   A) Set both batch sizes to be Equal, OR
>   B) Set the underlying batch.size to a sufficiently large number so as to 
> effectively disable internal batch management
> 
> 
> Below numbers are in MB/s.  The 'Batch' column indicate the number of events 
> between each explicit client flush()
> Setup is 1-node broker and acks=1.
> 
> 1 partition
> Batch=4kBatch=8kBatch=16k
> Equal batchSizes (a)16  32  52
> large batch.Size (b)140 123 124
> 
> 4 partitions
> Batch=4kBatch=8kBatch=16k
> Equal batchSz (a)   35  61  82
> large batch.size (b)7   7   7
> 8 partitions
> Batch=4kBatch=8kBatch=16k
> Equal batchSz (a)   49  70  99
> large batch.size (b)7   8   7
> 
> 
> There are two issues noticeable in these number:
> 1 - Case A is much faster than case B for 4 and 8 partitions.
> 2 - Single partition mode outperforms all others and here case B is faster 
> than case A.
> 
> 
> 
> 
> Side Note: I used the  client APIs  from the trunk while the broker is 
> running 0.8.2 (I don't think it matters, but nevertheless wanted to point out)
> 



[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-28 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518533#comment-14518533
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Created reviewboard https://reviews.apache.org/r/33654/diff/
 against branch apache/trunk

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121.patch, 
> KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
> KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
> KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
> KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
> KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > >
> > > }
> > > ---
> > >
> > > let's say MyMetricsReporter creates a thread in constructor. if hostname
> > > validation threw an exception, constructor won't call the close method of
> > > MyMetricsReporter to clean up the resource. as a result, we created
> > thread
> > > leak issue. this becomes worse when we try to auto recovery (i.e. keep
> > > creating KafkaProducer again -> failing again -> more thread leaks).
> > >
> > > there are multiple options of fixing this.
> > >
> > > 1) just move the hostname validation to the beginning. but this is only
> > fix
> > > one symtom. it didn't fix the fundamental problem. what if some other
> > lines
> > > throw an exception.
> > >
> > > 2) use try-catch. in the catch section, try to call close methods for any
> > > non-null objects constructed so far.
> > >
> > > 3) explicitly de

[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-28 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Status: Patch Available  (was: Reopened)

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121.patch, 
> KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
> KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
> KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
> KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
> KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > >
> > > }
> > > ---
> > >
> > > let's say MyMetricsReporter creates a thread in constructor. if hostname
> > > validation threw an exception, constructor won't call the close method of
> > > MyMetricsReporter to clean up the resource. as a result, we created
> > thread
> > > leak issue. this becomes worse when we try to auto recovery (i.e. keep
> > > creating KafkaProducer again -> failing again -> more thread leaks).
> > >
> > > there are multiple options of fixing this.
> > >
> > > 1) just move the hostname validation to the beginning. but this is only
> > fix
> > > one symtom. it didn't fix the fundamental problem. what if some other
> > lines
> > > throw an exception.
> > >
> > > 2) use try-catch. in the catch section, try to call close methods for any
> > > non-null objects constructed so far.
> > >
> > > 3) explicitly declare the dependency in the constructor. this way, when
> > > KafkaProducer threw an exception, I can c

[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-28 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121.patch

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121.patch, 
> KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
> KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
> KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
> KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
> KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > >
> > > }
> > > ---
> > >
> > > let's say MyMetricsReporter creates a thread in constructor. if hostname
> > > validation threw an exception, constructor won't call the close method of
> > > MyMetricsReporter to clean up the resource. as a result, we created
> > thread
> > > leak issue. this becomes worse when we try to auto recovery (i.e. keep
> > > creating KafkaProducer again -> failing again -> more thread leaks).
> > >
> > > there are multiple options of fixing this.
> > >
> > > 1) just move the hostname validation to the beginning. but this is only
> > fix
> > > one symtom. it didn't fix the fundamental problem. what if some other
> > lines
> > > throw an exception.
> > >
> > > 2) use try-catch. in the catch section, try to call close methods for any
> > > non-null objects constructed so far.
> > >
> > > 3) explicitly declare the dependency in the constructor. this way, when
> > > KafkaProducer threw an exception, I can call close me

Review Request 33654: Patch for KAFKA-2121

2015-04-28 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33654/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


Diffs
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 

Diff: https://reviews.apache.org/r/33654/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-28 Thread Haohui Mai
There are actually two advantages from using SSL/TLS compared to SASL to secure 
the communication channel:


* Security: SSL/TLS offers Perfect Forward Secrecy when using EDH to establish 
encryption keys. The confidentiality of the past sessions still holds even if 
the long term private key of the server is compromised. This is not specified 
in SASL.


* Performance in JVM-based implementation. The SASL APIs in Java look like the 
following:


// encrypt data

public byte[] wrap(byte inBuf[], int offset, int len...)
// decrypt data
public byte[] unwrap(byte inBuf[], int offset, int len...)

Note that there are significant overheads on copying bytes and GC. Take 
encrypting the data for example:

(1) Users pass in the payload as a byte array.
(2) The SASL implementation (e.g., the one backed by GSSAPI) first allocates a 
new byte[] array, then pads the payload with the sequence id / checksum, etc. 
and finally encrypts the data with AES.

Our experience in HDFS is that for large amount of data AES is fairly efficient 
compared to copying bytes and the GC activity. We saw long GC pauses due to 
frequent allocation of large heap memory. In the end we work around this 
problem by performing SASL authentication first and encrypting the traffic 
directly by setting up an AES codecs.

Hope it helps.

Regards,
Haohui



From: Sriharsha Chintalapani 
Sent: Tuesday, April 28, 2015 4:31 PM
To: dev@kafka.apache.org
Cc: Haohui Mai
Subject: Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

Updated KIP-12 is here 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 .  My 
new proposal talks about Channel having two layers one is TransportLayer and 
another Authentication Layer.
  If users want to do  authentication without encryption they can use 
PlainTextTransportLayer with sasl .auth.
  If users want to do kerberos auth + encryption they can do so by using 
SSLTransportLayer and sasl auth.
  If they choose to do just SSL they can also do that without enabling any SASL 
authentication in this case Session will get SSL authenticated peer principal.
Gwen raised a concern about users just having PLAINTEXT channel along with  
SASL auth channel I’ll address that in the KIP.

  There was questions raised about why not use SASL encryption instead of using 
ssl for encryption. After speaking to HDFS team they advised against this 
approach as it turned out there are performance issues.   cc’ed  Haohui who is 
an HDFS committer.

Thanks,
Harsha



On April 24, 2015 at 9:37:31 AM, Sriharsha Chintalapani 
(harsh...@fastmail.fm) wrote:

I yet to update the KIP with my latest proposal. So give me few days to update 
it.
I am looking at supporting KERBEROS for the first release and going to use JAAS 
Login Modules to provide authentication.
And will we provide a default SASL PLAIN mechanism on the server side

Yes . I’ll update the KIP and send out an email for further discussion as it 
will make it easier.

Thanks,
Harsha



On April 24, 2015 at 9:30:04 AM, Gari Singh 
(gari.r.si...@gmail.com) wrote:

Great.  Sounds good.  I'll re-read the KIP ASAP.

Is their another KIP around authentication providers or is that being tracked 
here as well.  For example, the SASL PLAIN mechanism carries a username and 
password but currently I don't know where that would be authenticated?  I 
notice that AuthUtils has the ability read a JAAS config, but the KIP only has 
entries relevant to Kerberos.  Is the idea to use JAAS LoginModules to provide 
pluggable authentication  - so we could use some of the JDK provided 
LoginModules or create our own (e.g. use a local password file, LDAP, etc)?  
And will we provide a default SASL PLAIN mechanism on the server side or would 
we implement custom SASL provider modules?

Also - I am happy to take a look / test any code as you move along.  Also happy 
to help with SASL providers and/or authentication/login modules

Thanks,

Gari

On Fri, Apr 24, 2015 at 12:05 PM, Sriharsha Chintalapani 
mailto:harsh...@fastmail.fm>> wrote:
Hi Gari,
   I apologize for not clear in KIP and starting discussion thread earlier.
My initial proposal( currently in KIP ) to provide PLAINTEXT, SSL and KERBEROS 
as individual protocol implementation.
As you mentioned at the end
“In terms of message level integrity and confidentiality (not to be confused
with transport level security like TLS), SASL also provides for this
(assuming the mechanism supports it). The SASL library supports this via
the "props" parameter in the "createSaslClient/Server" methods. So it is
easily possible to support Kerberos with integrity (MIC) or confidentiality
(encryption) over TCP and without either over TLS. “

My intention was to use sasl to do auth and also provide encryption over plain 
text channel. But after speaking to many who implemented Sasl this way for HDFS 
and HBASE , other projects as well their suggestion 

[jira] [Updated] (KAFKA-2151) make MockMetricsReporter a little more generic

2015-04-28 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2151:
--
Resolution: Duplicate
Status: Resolved  (was: Patch Available)

close this one as duplicate. will submit follow-up patch in the origin 
KAFKA-2121 jira

> make MockMetricsReporter a little more generic
> --
>
> Key: KAFKA-2151
> URL: https://issues.apache.org/jira/browse/KAFKA-2151
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Attachments: KAFKA-2151.patch
>
>
> this is a follow-up improvement on the test code related KAFKA-2121. since we 
> moved MockMetricsReporter into a generic/public location, it's better to make 
> it a little more generic. updated KafkaProducerTest and KafkaConsumerTest 
> accordingly.
> [~ewencp] since you are familiar with the KAFKA-2121. will ask you as 
> reviewer. should be an easy one :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Perf testing flush() - issues found

2015-04-28 Thread Roshan Naik
Based on recent suggestion by Joel, I am experimenting with using flush() to 
simulate  batched-sync behavior.
The essence of my  single threaded producer code is :

for (int i = 0; i < numRecords;) {
// 1- Send a batch
for(int batchCounter=0; batchCounter f =  producer.send(record, null);
futureList.add(f);
i++;
}
// 2- Flush after sending batch
producer.flush();

// 3- Ensure all msgs were send
for( Future f : futureList) {
f.get();
}
}

There are actually two batch size in play here. One is the number of messages 
between every flush() call made by the client. The other is the  batch.size  
setting which impacts the batching internally done by the underlying Async api.

Intuitively  .. we either want to
  A) Set both batch sizes to be Equal, OR
  B) Set the underlying batch.size to a sufficiently large number so as to 
effectively disable internal batch management


Below numbers are in MB/s.  The 'Batch' column indicate the number of events 
between each explicit client flush()
Setup is 1-node broker and acks=1.

1 partition
Batch=4kBatch=8kBatch=16k
Equal batchSizes (a)16  32  52
large batch.Size (b)140 123 124

4 partitions
Batch=4kBatch=8kBatch=16k
Equal batchSz (a)   35  61  82
large batch.size (b)7   7   7
8 partitions
Batch=4kBatch=8kBatch=16k
Equal batchSz (a)   49  70  99
large batch.size (b)7   8   7


There are two issues noticeable in these number:
1 - Case A is much faster than case B for 4 and 8 partitions.
2 - Single partition mode outperforms all others and here case B is faster than 
case A.




Side Note: I used the  client APIs  from the trunk while the broker is running 
0.8.2 (I don't think it matters, but nevertheless wanted to point out)



[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-04-28 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518484#comment-14518484
 ] 

Jiangjie Qin commented on KAFKA-1660:
-

Thanks for the reference, Jay. Totally agreed. We did not see performance 
degrade because the write lock is only grabbed once in the entire life cycle.

> Ability to call close() with a timeout on the Java Kafka Producer. 
> ---
>
> Key: KAFKA-1660
> URL: https://issues.apache.org/jira/browse/KAFKA-1660
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 0.8.2.0
>Reporter: Andrew Stein
>Assignee: Jiangjie Qin
> Fix For: 0.8.3
>
> Attachments: KAFKA-1660.patch, KAFKA-1660.patch, 
> KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, 
> KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, 
> KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, 
> KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, 
> KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch, 
> KAFKA-1660_2015-04-20_17:38:22.patch
>
>
> I would like the ability to call {{close}} with a timeout on the Java 
> Client's KafkaProducer.
> h6. Workaround
> Currently, it is possible to ensure that {{close}} will return quickly by 
> first doing a {{future.get(timeout)}} on the last future produced on each 
> partition, but this means that the user has to define the partitions up front 
> at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-28 Thread Sriharsha Chintalapani
Updated KIP-12 is here 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 .  My 
new proposal talks about Channel having two layers one is TransportLayer and 
another Authentication Layer. 
  If users want to do  authentication without encryption they can use 
PlainTextTransportLayer with sasl .auth.
  If users want to do kerberos auth + encryption they can do so by using 
SSLTransportLayer and sasl auth. 
  If they choose to do just SSL they can also do that without enabling any SASL 
authentication in this case Session will get SSL authenticated peer principal.  
Gwen raised a concern about users just having PLAINTEXT channel along with  
SASL auth channel I’ll address that in the KIP.

  There was questions raised about why not use SASL encryption instead of using 
ssl for encryption. After speaking to HDFS team they advised against this 
approach as it turned out there are performance issues.   cc’ed  Haohui who is 
an HDFS committer.

Thanks,
Harsha


On April 24, 2015 at 9:37:31 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
wrote:

I yet to update the KIP with my latest proposal. So give me few days to update 
it. 
I am looking at supporting KERBEROS for the first release and going to use JAAS 
Login Modules to provide authentication.
And will we provide a default SASL PLAIN mechanism on the server side 
Yes . I’ll update the KIP and send out an email for further discussion as it 
will make it easier.

Thanks,
Harsha


On April 24, 2015 at 9:30:04 AM, Gari Singh (gari.r.si...@gmail.com) wrote:

Great.  Sounds good.  I'll re-read the KIP ASAP.

Is their another KIP around authentication providers or is that being tracked 
here as well.  For example, the SASL PLAIN mechanism carries a username and 
password but currently I don't know where that would be authenticated?  I 
notice that AuthUtils has the ability read a JAAS config, but the KIP only has 
entries relevant to Kerberos.  Is the idea to use JAAS LoginModules to provide 
pluggable authentication  - so we could use some of the JDK provided 
LoginModules or create our own (e.g. use a local password file, LDAP, etc)?  
And will we provide a default SASL PLAIN mechanism on the server side or would 
we implement custom SASL provider modules?

Also - I am happy to take a look / test any code as you move along.  Also happy 
to help with SASL providers and/or authentication/login modules

Thanks,

Gari

On Fri, Apr 24, 2015 at 12:05 PM, Sriharsha Chintalapani  
wrote:
Hi Gari,
       I apologize for not clear in KIP and starting discussion thread earlier. 
My initial proposal( currently in KIP ) to provide PLAINTEXT, SSL and KERBEROS 
as individual protocol implementation. 
As you mentioned at the end
“In terms of message level integrity and confidentiality (not to be confused 
with transport level security like TLS), SASL also provides for this 
(assuming the mechanism supports it). The SASL library supports this via 
the "props" parameter in the "createSaslClient/Server" methods. So it is 
easily possible to support Kerberos with integrity (MIC) or confidentiality 
(encryption) over TCP and without either over TLS. “

My intention was to use sasl to do auth and also provide encryption over plain 
text channel. But after speaking to many who implemented Sasl this way for HDFS 
and HBASE , other projects as well their suggestion was not to use
context.wrap and context.unwrap which does the encryption for sasl  causes 
performance degradation. 

Currently I am working on SASL authentication as an option over TCP or TLS. 
I’ll update the KIP soon once I’ve got interfaces in place. Sorry about the 
confusion on this as I am testing out multiple options and trying to decide 
right one.

Thanks,
Harsha


On April 24, 2015 at 8:37:09 AM, Gari Singh (gari.r.si...@gmail.com) wrote:

Sorry for jumping in late, but I have been trying to follow this chain as
well as the updates to the KIP. I don't mean to seem critical and I may be
misunderstanding the proposed implementation, but there seems to be some
confusion around terminology (at least from my perspective) and I am not
sure I actually understand what is going to be implemented and where the
plugin point(s) will be.

The KIP does not really mention SASL interfaces in any detail. The way I
read the KIP it seems as if if is more about providing a Kerberos mechanism
via GSSAPI than it is about providing pluggable SASL support. Perhaps it
is the naming convention ("GSS" is used where I would have though SASL
would have been used).

Maybe I am missing something?

SASL leverages GSSAPI for the Kerberos mechanism, but SASL and the GSSAPI
are not the same thing. Also, SSL/TLS is independent of both SASL and
GSSAPI although you can use either SASL or GSSAPI over TLS.

I would expect something more along the lines of having a SASLChannel and
SASL providers (along with pluggable Authentication providers which
enumerate which SASL mechanisms they support).

I have only ever attempted to real

[jira] [Commented] (KAFKA-2154) MetadataResponse is Empty on a Fresh Cluster

2015-04-28 Thread Emmett Butler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518343#comment-14518343
 ] 

Emmett Butler commented on KAFKA-2154:
--

I've been able to replicate this issue on Debian 7.8 running Kafka 0.8.2.1. 
Currently I'm working around it by manually creating a topic whenever I spin up 
a fresh cluster. However, needing to do this in production is suboptimal.

> MetadataResponse is Empty on a Fresh Cluster
> 
>
> Key: KAFKA-2154
> URL: https://issues.apache.org/jira/browse/KAFKA-2154
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Keith Bourgoin
>
> When I start a fresh cluster using {{bin/kafka-server-start.sh}} and issue a 
> MetadataRequest to it, the results are blank.  It's correct that there are no 
> topics, but there are also no brokers returned.  I'm writing a driver for 
> Kafka, so this makes the initial connection to the cluster difficult.
> To reproduce:
>   * Start Zookeeper with {{bin/zookeeper-server-start.sh 
> config/zookeeper.properties}} and a broker with {{bin/kafka-server-start.sh 
> config/server.properties}}.  Be sure there's nothing in {{/tmp}} from a 
> previous run.
>   * Run this {{echo -e 
> "\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
>  | nc localhost 9092 | hd}} and observe the output:
> {noformat}
>   00 00 00 0c 00 00 00 00  00 00 00 00 00 00 00 00  ||
> 0010
> {noformat}
>   * Create a topic using {{bin/kafka-topics.sh --zookeeper localhost:2181 
> --create --topic test --partitions 2 --replication-factor 1}}
>   * Re-run the same command and now observe the output:
> {noformat}
> kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
> "\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
>  | nc localhost 9092 | hd
>   00 00 00 61 00 00 00 00  00 00 00 01 00 00 00 00  |...a|
> 0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
> 0020  84 00 00 00 01 00 00 00  04 74 65 73 74 00 00 00  |.test...|
> 0030  02 00 00 00 00 00 01 00  00 00 00 00 00 00 01 00  ||
> 0040  00 00 00 00 00 00 01 00  00 00 00 00 00 00 00 00  ||
> 0050  00 00 00 00 00 00 00 00  01 00 00 00 00 00 00 00  ||
> 0060  01 00 00 00 00|.|
> 0065
> {noformat}
> In this case, "parsely-dev" is the name of my work VM and the "#" following 
> it is the port number.  I've verified it's a correctly formatted 
> MetadataResponse.  It's the first null result that we've having a hard time 
> dealing with.
> As for the bytestring, that's a correctly formatted MetadataRequest with no 
> topics specified.  Presumably if I specified a topic name it would 
> auto-create the topic and then start returning broker information.  It 
> doesn't really change the fact that the initial state is fairly broken.
> Finally, it's worth noting that if I delete the "test" topic (after turning 
> on {{delete.topic.enable}}) then the responses still include broker 
> information. It's just the initial state which is causing problems.
> {noformat}
> kfb@parsely-dev:~/src/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 
> --delete --topic test
> Topic test is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
> "\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
>  | nc localhost 9092 | hd
>   00 00 00 21 00 00 00 00  00 00 00 01 00 00 00 00  |...!|
> 0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
> 0020  84 00 00 00 00|.|
> 0025
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2153) kafka-patch-review tool uploads a patch even if it is empty

2015-04-28 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518320#comment-14518320
 ] 

Ashish K Singh commented on KAFKA-2153:
---

Created reviewboard https://reviews.apache.org/r/33645/
 against branch trunk

> kafka-patch-review tool uploads a patch even if it is empty
> ---
>
> Key: KAFKA-2153
> URL: https://issues.apache.org/jira/browse/KAFKA-2153
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Attachments: KAFKA-2153.patch
>
>
> kafka-patch-review tool is great and a big help. However, sometimes one 
> forgets to commit the changes made and runs this tool. The tool ends up 
> uploading an empty patch. It will be nice to catch and intimate the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2153) kafka-patch-review tool uploads a patch even if it is empty

2015-04-28 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2153:
--
Attachment: KAFKA-2153.patch

> kafka-patch-review tool uploads a patch even if it is empty
> ---
>
> Key: KAFKA-2153
> URL: https://issues.apache.org/jira/browse/KAFKA-2153
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Attachments: KAFKA-2153.patch
>
>
> kafka-patch-review tool is great and a big help. However, sometimes one 
> forgets to commit the changes made and runs this tool. The tool ends up 
> uploading an empty patch. It will be nice to catch and intimate the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2153) kafka-patch-review tool uploads a patch even if it is empty

2015-04-28 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2153:
--
Status: Patch Available  (was: Open)

> kafka-patch-review tool uploads a patch even if it is empty
> ---
>
> Key: KAFKA-2153
> URL: https://issues.apache.org/jira/browse/KAFKA-2153
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Attachments: KAFKA-2153.patch
>
>
> kafka-patch-review tool is great and a big help. However, sometimes one 
> forgets to commit the changes made and runs this tool. The tool ends up 
> uploading an empty patch. It will be nice to catch and intimate the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33645: Patch for KAFKA-2153

2015-04-28 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33645/
---

Review request for kafka.


Bugs: KAFKA-2153
https://issues.apache.org/jira/browse/KAFKA-2153


Repository: kafka


Description
---

KAFKA-2153: kafka-patch-review tool uploads a patch even if it is empty


Diffs
-

  dev-utils/test-patch.py 9d88a6e3926de8a4a62bbb03accfc3928c075b08 
  kafka-patch-review.py 959268069cd0cba3c70e10a2cf4829ca241b13a5 

Diff: https://reviews.apache.org/r/33645/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-04-28 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518314#comment-14518314
 ] 

Jay Kreps commented on KAFKA-1660:
--

Here was the source of my paranoia about r/w locks compared to counters:
http://blog.takipi.com/java-8-stampedlocks-vs-readwritelocks-and-synchronized/
http://blog.takipi.com/java-8-longadders-the-fastest-way-to-add-numbers-concurrently/
Now they have added some new StampedLock that is meant to be a faster 
replacement but is only in jdk 8.

This may not be applicable in the case where writes are really really rare 
which may be why we don't see it in our perf testing.

My concern about RecordAccumulator complexity remains (basically nested locks 
like that always seem to bite us). I'm not sure I have a better solution there 
though. Let me think a bit and see if I can come up with anything.

> Ability to call close() with a timeout on the Java Kafka Producer. 
> ---
>
> Key: KAFKA-1660
> URL: https://issues.apache.org/jira/browse/KAFKA-1660
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 0.8.2.0
>Reporter: Andrew Stein
>Assignee: Jiangjie Qin
> Fix For: 0.8.3
>
> Attachments: KAFKA-1660.patch, KAFKA-1660.patch, 
> KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, 
> KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, 
> KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, 
> KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, 
> KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch, 
> KAFKA-1660_2015-04-20_17:38:22.patch
>
>
> I would like the ability to call {{close}} with a timeout on the Java 
> Client's KafkaProducer.
> h6. Workaround
> Currently, it is possible to ensure that {{close}} will return quickly by 
> first doing a {{future.get(timeout)}} on the last future produced on each 
> partition, but this means that the user has to define the partitions up front 
> at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-21 Configuration Management

2015-04-28 Thread Aditya Auradkar
Hey everyone,

Wrote up a KIP to update topic, client and broker configs dynamically via 
Zookeeper.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration

Please read and provide feedback.

Thanks,
Aditya

PS: I've intentionally kept this discussion separate from KIP-5 since I'm not 
sure if that is actively being worked on and I wanted to start with a clean 
slate.


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-28 Thread Andrii Biletskyi
Guys,

A quick summary of our today's meeting.

There were no additional issues/questions. The only item about which
we are not 100% sure is "multiple instructions for one topic in one
request" case.
It was proposed by Jun to explain reasons behind not allowing users doing
that again
here in mailing list, and in case we implement it in final version document
it
well so API clients understand what exactly is not allowed and why.

At the meantime I will update the KIP. After that I will start voting
thread.

Thanks,
Andrii Biletskyi

On Tue, Apr 28, 2015 at 10:33 PM, Andrii Biletskyi <
andrii.bilets...@stealth.ly> wrote:

> Guys,
>
> It seems that there are no open questions left so prior to our weekly call
> let me summarize what I'm going to implement as part of phase one for
> KIP-4.
>
> 1. Add 3 new Wire Protocol requests - Create-, Alter- and
> DeleteTopicRequest
>
> 2. Topic requests are batch requests, errors are returned per topic as part
> of batch response.
>
> 3. Topic requests are asynchronous - respective commands are only
> started and server is not blocked until command is finished.
>
> 4. It will be not allowed to specify multiple mutations for the same topic
> in scope of one batch request - a special error will be returned for such
> topic.
>
> 5. There will be no dedicated request for reassign-partitions - it is
> simulated
> with AlterTopicRequest.ReplicaAssignment field.
>
> 6. Preferred-replica-leader-election is not supported since there is no
> need to have
> a public API to trigger such operation.
>
> 7. TopicMetadataReqeust will be evolved to version 1 - topic-level
> configuration
> per topic will be included and ISR field will be removed. Automatic
> topic-creation
> logic will be removed (we will use CreateTopicRequest for that).
>
> Thanks,
> Andrii Biletskyi
>
>
> On Tue, Apr 28, 2015 at 12:23 AM, Jun Rao  wrote:
>
>> Yes, to verify if a partition reassignment completes or not, we just need
>> to make sure AR == RAR. So, we don't need ISR for this. It's probably
>> still
>> useful to know ISR for monitoring in general though.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Apr 27, 2015 at 4:15 AM, Andrii Biletskyi <
>> andrii.bilets...@stealth.ly> wrote:
>>
>> > Okay, I had some doubts in terms of reassign-partitions case. I was
>> > not sure whether we need ISR to check post condition of partition
>> > reassignment. But I think we can rely on assigned replicas - the
>> workflow
>> > in reassignPartitions is the following:
>> > 1. Update AR in ZK with OAR + RAR.
>> > ...
>> > 10. Update AR in ZK with RAR.
>> > 11. Update the /admin/reassign_partitions path in ZK to remove this
>> > partition.
>> > 12. After electing leader, the replicas and isr information changes. So
>> > resend the update metadata request to every broker.
>> >
>> > In other words AR becomes RAR right before removing partitions from the
>> > admin path. I think we can consider (with a little approximation)
>> > reassignment
>> > completed if AR == RAR.
>> >
>> > If it's okay, I will remove ISR and add topic config in one change as
>> > discussed
>> > earlier.
>> >
>> > Thanks,
>> > Andrii Biletskyi
>> >
>> >
>> > On Mon, Apr 27, 2015 at 1:50 AM, Jun Rao  wrote:
>> >
>> > > Andrii,
>> > >
>> > > Another thing. We decided not to add the lag info in TMR. To be
>> > consistent,
>> > > we probably also want to remove ISR from TMR since only the leader
>> knows
>> > > it. We can punt on adding any new request from getting ISR. ISR is
>> mostly
>> > > useful for monitoring. Currently, one can determine if a replica is in
>> > ISR
>> > > from the lag metrics (a replica is in ISR if its lag is <=0).
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Sun, Apr 26, 2015 at 4:31 PM, Andrii Biletskyi <
>> > > andrii.bilets...@stealth.ly> wrote:
>> > >
>> > > > Jun,
>> > > >
>> > > > I like your approach to AlterTopicReques semantics! Sounds like
>> > > > we linearize all request fields to ReplicaAssignment - I will
>> > definitely
>> > > > try this out to ensure there are no other pitfalls.
>> > > >
>> > > > With regards to multiple instructions in one batch per topic. For me
>> > > > this sounds reasonable too. We discussed last time that it's pretty
>> > > > strange we give users schema that supports batching and at the
>> > > > same time introduce restrictions to the way batching can be used
>> > > > (in this case - only one instruction per topic). But now, when we
>> give
>> > > > users everything they need to avoid such misleading use cases (if
>> > > > we implement the previous item - user will be able to specify/change
>> > > > all fields in one instruction) - it might be a good justification to
>> > > > prohibit
>> > > > serving such requests.
>> > > >
>> > > > Any objections?
>> > > >
>> > > > Thanks,
>> > > > Andrii BIletskyi
>> > > >
>> > > >
>> > > >
>> > > > On Sun, Apr 26, 2015 at 11:00 PM, Jun Rao  wrote:
>> > > >
>> > > > > Andrii,
>> > > > >
>> > > > > Thanks for the update.
>> > > > >
>> > > > > For your secon

[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-04-28 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518184#comment-14518184
 ] 

David Hay commented on KAFKA-1835:
--

[~smiklosovic] I've worked around the issue by creating my own thread pool and 
wrapping the producer send requests with my own Callable, as follows.  This 
also uses Guava's ListenableFuture class:
{code:language=java}
ExecutorService executorService = Executors.newFixedThreadPool(5);

Future = Futures.dereference(
   executorService.submit(new Callable>() {
  public ListenableFuture call() throws Exception {
 final SettableFuture responseFuture = 
SettableFuture.create();
 try {
producer.send(new ProducerRecord("topic", "key", "message"), new 
Callback() {
   public void onCompletion(RecordMetadata metadata, Exception ex) {
  if (exception == null) { responseFuture.set(metadata) }
  else { resposneFuture.setException(ex) }
   }
}
 }
 catch (Exception ex) { responseFuture.setException(ex); }
 return responseFuture; 
  }
   });
{code}


> Kafka new producer needs options to make blocking behavior explicit
> ---
>
> Key: KAFKA-1835
> URL: https://issues.apache.org/jira/browse/KAFKA-1835
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
>Reporter: Paul Pearcy
> Fix For: 0.8.3
>
> Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
> KAFKA-1835.patch
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to 
> retrieve metadata for a topic. This is not the desired behavior in some use 
> cases where async non-blocking guarantees are required and message loss is 
> acceptable in known cases. Also, most developers will assume an API that 
> returns a future is safe to call in a critical request path. 
> Discussing on the mailing list, the most viable option is to have the 
> following settings:
>  pre.initialize.topics=x,y,z
>  pre.initialize.timeout=x
>  
> This moves potential blocking to the init of the producer and outside of some 
> random request. The potential will still exist for blocking in a corner case 
> where connectivity with Kafka is lost and a topic not included in pre-init 
> has a message sent for the first time. 
> There is the question of what to do when initialization fails. There are a 
> couple of options that I'd like available:
> - Fail creation of the client 
> - Fail all sends until the meta is available 
> Open to input on how the above option should be expressed. 
> It is also worth noting more nuanced solutions exist that could work without 
> the extra settings, they just end up having extra complications and at the 
> end of the day not adding much value. For instance, the producer could accept 
> and queue messages(note: more complicated than I am making it sound due to 
> storing all accepted messages in pre-partitioned compact binary form), but 
> you're still going to be forced to choose to either start blocking or 
> dropping messages at some point. 
> I have some test cases I am going to port over to the Kafka producer 
> integration ones and start from there. My current impl is in scala, but 
> porting to Java shouldn't be a big deal (was using a promise to track init 
> status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33614: Patch for KAFKA-2132

2015-04-28 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33614/
---

(Updated April 28, 2015, 9:38 p.m.)


Review request for kafka.


Bugs: KAFKA-2132
https://issues.apache.org/jira/browse/KAFKA-2132


Repository: kafka


Description
---

KAFKA-2132: Move Log4J appender to clients module


Diffs (updated)
-

  build.gradle 006ced8d0c539d11e126aca62c147b916331 
  checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
  core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala 
5d36a019e3dbfb93737a9cd23404dcd1c5d836d1 
  core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 
41366a14590d318fced0e83d6921d8035fa882da 
  log4j/src/main/java/org/apache/kafka/log4j/KafkaLog4jAppender.java 
PRE-CREATION 
  log4j/src/test/java/org/apache/kafka/log4j/KafkaLog4jAppenderTest.java 
PRE-CREATION 
  log4j/src/test/java/org/apache/kafka/log4j/MockKafkaLog4jAppender.java 
PRE-CREATION 
  settings.gradle 83f764e6a4a15a5fdba232dce74a369870f26b45 

Diff: https://reviews.apache.org/r/33614/diff/


Testing
---


Thanks,

Ashish Singh



ZooKeeper Session Timeout During Startup Caused By Long ParNewGC Pauses

2015-04-28 Thread James Lent

I have filed a bug report with Oracle on this issue today.  I have not yet been 
assigned a bug number.  I thought the issue might be of general interest to the 
Kafka community and I have a suggestion for a configuration change that works 
around the issue in case other people tun into it.

First the issue I am about to describe will be hard to believe, but, I do 
provide a short Java program that demonstrates the JVM GC issue that triggers 
the long ParNewGC pauses under certain conditions.  Before you dismiss the 
analysis out of hand I suggest you try try test program below.  I have tested 
it in multiple environments: linux dev box, linux laptop, and a linux EC2 
instance.  Other engineers have also done so in their own environments.  I 
would be interested to know what you see.

First the conditions:

1) More than a 1G heap (we ran into it when we increased the heap from 1G to 
2G).
2) Requires at least some topics defined.  Not sure the exact amount.  We don't 
see this issue in all our environments.
3) Log compaction enabled.
4) Using JVM other than Oracle 6.  I have reproduced the underlying issue (with 
the test program below) using OpenJDK 6, Oracle 7, OpenJDK 7, and Oracle 8 and 
OpenJDK 8.

Under these conditions we can see 9 second or longer pauses due to individual 
ParNew GC events (which is greater than the default 6 second ZooKeeper session 
timeout).  It tends to be very reproducible in a given environment (e.g. either 
always happens or it never happens).  We are using Kafka 0.8.1.1.

The long ParNewGC delays are triggered by the allocation of a large static object during 
startup.  Specially the SkimpyOffsetMap bytes buffer.  There is no reason why this should 
be the case and that is why I have opened a JVM bug with Oracle.  The even more 
unbelievable part is this bug only seems to trigger a problem for static objects in the 
480M to 512M range.  The default size of this byte buffer is 500M.  Our current 
workaround it to configure this buffer to use 400M instead.  We have verified that this 
makes the issue go away in multiple "real" Kafka environments.  Using the G1 GC 
is another option we are considering (and seems to work).

Here is the test program (note that the size of the static object matches the 
default size of the SkimpyOffsetMap bytes buffer):

import java.util.ArrayList;

public class LongParNewPause {

   static byte[] bigStaticObject;

   public static void main(String[] args) throws Exception {
   int bigObjSize= args.length > 0 ? Integer.parseInt(args[0]) : 
524288000;
   int littleObjSize = args.length > 1 ? Integer.parseInt(args[1]) : 100;
   int saveFraction  = args.length > 2 ? Integer.parseInt(args[2]) : 15;

   bigStaticObject = new byte[bigObjSize];
   ArrayList holder = new ArrayList();

   int i = 0;
   while (true) {
   byte[] local = new byte[littleObjSize];
   if (i++ % saveFraction == 0) {
   holder.add(local);
   }
   }
   }
}


You should run it with the following JVM options:

-verbose:gc
-XX:+PrintGCTimeStamps
-XX:+PrintGCDetails
-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-Xmx2G
-Xms2G

Here is what I see using Oracle 8:

0.355: [GC (Allocation Failure) 0.355: [ParNew: 272640K->18193K(306688K), 
0.0265380 secs] 784640K->530193K(4160256K), 0.0266064 secs] [Times: user=0.07 
sys=0.00, real=0.03 secs]
0.420: [GC (Allocation Failure) 0.420: [ParNew: 290833K->23124K(306688K), 
11.4522887 secs] 802833K->552573K(4160256K), 11.4523296 secs] [Times: user=42.51 
sys=0.18, real=11.45 secs]
11.902: [GC (Allocation Failure) 11.902: [ParNew: 295764K->33361K(306688K), 
25.6937427 secs] 825213K->580830K(4160256K), 25.6937906 secs] [Times: user=100.63 
sys=0.11, real=25.70 secs]
37.627: [GC (Allocation Failure) 37.627: [ParNew: 306001K->25620K(306688K), 
0.0219097 secs] 853470K->591124K(4160256K), 0.0219448 secs] [Times: user=0.08 
sys=0.00, real=0.02 secs]
...

If I pass in 500,000,000 for the size:

0.344: [GC (Allocation Failure) 0.344: [ParNew: 272640K->18181K(306688K), 
0.0253515 secs] 760921K->506462K(4160256K), 0.0253957 secs] [Times: user=0.04 
sys=0.01, real=0.02 secs]
0.408: [GC (Allocation Failure) 0.408: [ParNew: 290821K->31804K(306688K), 
0.0499212 secs] 779102K->537534K(4160256K), 0.0499556 secs] [Times: user=0.08 
sys=0.01, real=0.05 secs]
0.491: [GC (Allocation Failure) 0.491: [ParNew: 30K->27084K(306688K), 
0.0286135 secs] 810174K->550834K(4160256K), 0.0286590 secs] [Times: user=0.08 
sys=0.00, real=0.03 secs]
0.553: [GC (Allocation Failure) 0.554: [ParNew: 299724K->27617K(306688K), 
0.0387482 secs] 823474K->569401K(4160256K), 0.0387790 secs] [Times: user=0.07 
sys=0.01, real=0.04 secs]
0.625: [GC (Allocation Failure) 0.625: [ParNew: 300257K->21936K(306688K), 
0.0368052 secs] 842041K->584850K(4160256K), 0.0368367 secs] [Times: user=0.08 
sys=0.00, real=0.03 secs]
...

Playing with the other two parameters can vary the results, but, they are all bad if the 
size of the static

Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Steven Wu
sorry for the previous empty msg.

Jay's idea should work. basically, we override the close method in
Serializer interface.

public interface Serializer extends Closeable {
@Override
public void close();
}

On Tue, Apr 28, 2015 at 1:10 PM, Steven Wu  wrote:

>
>
> On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava 
> wrote:
>
>> Good point Jay. More specifically we were already implementing without the
>> checked exception, we'd need to override close() in the Serializer and
>> Deserializer interfaces and omit the throws clause. That definitely makes
>> them source compatible. Not sure about binary compatibility, I couldn't
>> find a quick answer but I think it's probably still compatible.
>>
>> -Ewen
>>
>> On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps  wrote:
>>
>> > Hey guys,
>> >
>> > You can implement Closable without the checked exception. Having close()
>> > methods throw checked exceptions isn't very useful unless there is a way
>> > for the caller to recover. In this case there really isn't, right?
>> >
>> > -Jay
>> >
>> > On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang 
>> wrote:
>> >
>> > > Folks,
>> > >
>> > > In a recent commit I made regarding KAFKA-2121, there is an omitted
>> API
>> > > change which makes Serializer / Deserializer extending from Closeable,
>> > > whose close() call could throw IOException by declaration. Hence now
>> some
>> > > scenario like:
>> > >
>> > > -
>> > >
>> > > Serializer keySerializer = ...
>> > > Serializer valueSerializer = ...
>> > > KafkaProducer producer = new KafkaProducer(config, keySerializer,
>> > > valueSerializer)
>> > > // ...
>> > > keySerializer.close()
>> > > valueSerializer.close()
>> > >
>> > > -
>> > >
>> > > will need to capture IOException now.
>> > >
>> > > Want to bring this up for people's attention, and you opinion on
>> whether
>> > we
>> > > should revert this change?
>> > >
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>


Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Steven Wu
On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava 
wrote:

> Good point Jay. More specifically we were already implementing without the
> checked exception, we'd need to override close() in the Serializer and
> Deserializer interfaces and omit the throws clause. That definitely makes
> them source compatible. Not sure about binary compatibility, I couldn't
> find a quick answer but I think it's probably still compatible.
>
> -Ewen
>
> On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps  wrote:
>
> > Hey guys,
> >
> > You can implement Closable without the checked exception. Having close()
> > methods throw checked exceptions isn't very useful unless there is a way
> > for the caller to recover. In this case there really isn't, right?
> >
> > -Jay
> >
> > On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang 
> wrote:
> >
> > > Folks,
> > >
> > > In a recent commit I made regarding KAFKA-2121, there is an omitted API
> > > change which makes Serializer / Deserializer extending from Closeable,
> > > whose close() call could throw IOException by declaration. Hence now
> some
> > > scenario like:
> > >
> > > -
> > >
> > > Serializer keySerializer = ...
> > > Serializer valueSerializer = ...
> > > KafkaProducer producer = new KafkaProducer(config, keySerializer,
> > > valueSerializer)
> > > // ...
> > > keySerializer.close()
> > > valueSerializer.close()
> > >
> > > -
> > >
> > > will need to capture IOException now.
> > >
> > > Want to bring this up for people's attention, and you opinion on
> whether
> > we
> > > should revert this change?
> > >
> > > -- Guozhang
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Ewen Cheslack-Postava
Good point Jay. More specifically we were already implementing without the
checked exception, we'd need to override close() in the Serializer and
Deserializer interfaces and omit the throws clause. That definitely makes
them source compatible. Not sure about binary compatibility, I couldn't
find a quick answer but I think it's probably still compatible.

-Ewen

On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps  wrote:

> Hey guys,
>
> You can implement Closable without the checked exception. Having close()
> methods throw checked exceptions isn't very useful unless there is a way
> for the caller to recover. In this case there really isn't, right?
>
> -Jay
>
> On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang  wrote:
>
> > Folks,
> >
> > In a recent commit I made regarding KAFKA-2121, there is an omitted API
> > change which makes Serializer / Deserializer extending from Closeable,
> > whose close() call could throw IOException by declaration. Hence now some
> > scenario like:
> >
> > -
> >
> > Serializer keySerializer = ...
> > Serializer valueSerializer = ...
> > KafkaProducer producer = new KafkaProducer(config, keySerializer,
> > valueSerializer)
> > // ...
> > keySerializer.close()
> > valueSerializer.close()
> >
> > -
> >
> > will need to capture IOException now.
> >
> > Want to bring this up for people's attention, and you opinion on whether
> we
> > should revert this change?
> >
> > -- Guozhang
> >
>



-- 
Thanks,
Ewen


Re: Review Request 33634: Patch for KAFKA-2129

2015-04-28 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33634/#review81869
---

Ship it!


Your analysis sounds right, looks like the missing synchronized was just an 
oversight.

- Ewen Cheslack-Postava


On April 28, 2015, 6:12 p.m., Tim Brooks wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33634/
> ---
> 
> (Updated April 28, 2015, 6:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2129
> https://issues.apache.org/jira/browse/KAFKA-2129
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Synchronize method
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
> 
> Diff: https://reviews.apache.org/r/33634/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Tim Brooks
> 
>



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-28 Thread Andrii Biletskyi
Guys,

It seems that there are no open questions left so prior to our weekly call
let me summarize what I'm going to implement as part of phase one for KIP-4.

1. Add 3 new Wire Protocol requests - Create-, Alter- and DeleteTopicRequest

2. Topic requests are batch requests, errors are returned per topic as part
of batch response.

3. Topic requests are asynchronous - respective commands are only
started and server is not blocked until command is finished.

4. It will be not allowed to specify multiple mutations for the same topic
in scope of one batch request - a special error will be returned for such
topic.

5. There will be no dedicated request for reassign-partitions - it is
simulated
with AlterTopicRequest.ReplicaAssignment field.

6. Preferred-replica-leader-election is not supported since there is no
need to have
a public API to trigger such operation.

7. TopicMetadataReqeust will be evolved to version 1 - topic-level
configuration
per topic will be included and ISR field will be removed. Automatic
topic-creation
logic will be removed (we will use CreateTopicRequest for that).

Thanks,
Andrii Biletskyi


On Tue, Apr 28, 2015 at 12:23 AM, Jun Rao  wrote:

> Yes, to verify if a partition reassignment completes or not, we just need
> to make sure AR == RAR. So, we don't need ISR for this. It's probably still
> useful to know ISR for monitoring in general though.
>
> Thanks,
>
> Jun
>
> On Mon, Apr 27, 2015 at 4:15 AM, Andrii Biletskyi <
> andrii.bilets...@stealth.ly> wrote:
>
> > Okay, I had some doubts in terms of reassign-partitions case. I was
> > not sure whether we need ISR to check post condition of partition
> > reassignment. But I think we can rely on assigned replicas - the workflow
> > in reassignPartitions is the following:
> > 1. Update AR in ZK with OAR + RAR.
> > ...
> > 10. Update AR in ZK with RAR.
> > 11. Update the /admin/reassign_partitions path in ZK to remove this
> > partition.
> > 12. After electing leader, the replicas and isr information changes. So
> > resend the update metadata request to every broker.
> >
> > In other words AR becomes RAR right before removing partitions from the
> > admin path. I think we can consider (with a little approximation)
> > reassignment
> > completed if AR == RAR.
> >
> > If it's okay, I will remove ISR and add topic config in one change as
> > discussed
> > earlier.
> >
> > Thanks,
> > Andrii Biletskyi
> >
> >
> > On Mon, Apr 27, 2015 at 1:50 AM, Jun Rao  wrote:
> >
> > > Andrii,
> > >
> > > Another thing. We decided not to add the lag info in TMR. To be
> > consistent,
> > > we probably also want to remove ISR from TMR since only the leader
> knows
> > > it. We can punt on adding any new request from getting ISR. ISR is
> mostly
> > > useful for monitoring. Currently, one can determine if a replica is in
> > ISR
> > > from the lag metrics (a replica is in ISR if its lag is <=0).
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sun, Apr 26, 2015 at 4:31 PM, Andrii Biletskyi <
> > > andrii.bilets...@stealth.ly> wrote:
> > >
> > > > Jun,
> > > >
> > > > I like your approach to AlterTopicReques semantics! Sounds like
> > > > we linearize all request fields to ReplicaAssignment - I will
> > definitely
> > > > try this out to ensure there are no other pitfalls.
> > > >
> > > > With regards to multiple instructions in one batch per topic. For me
> > > > this sounds reasonable too. We discussed last time that it's pretty
> > > > strange we give users schema that supports batching and at the
> > > > same time introduce restrictions to the way batching can be used
> > > > (in this case - only one instruction per topic). But now, when we
> give
> > > > users everything they need to avoid such misleading use cases (if
> > > > we implement the previous item - user will be able to specify/change
> > > > all fields in one instruction) - it might be a good justification to
> > > > prohibit
> > > > serving such requests.
> > > >
> > > > Any objections?
> > > >
> > > > Thanks,
> > > > Andrii BIletskyi
> > > >
> > > >
> > > >
> > > > On Sun, Apr 26, 2015 at 11:00 PM, Jun Rao  wrote:
> > > >
> > > > > Andrii,
> > > > >
> > > > > Thanks for the update.
> > > > >
> > > > > For your second point, I agree that if a single AlterTopicRequest
> can
> > > > make
> > > > > multiple changes, there is no need to support the same topic
> included
> > > > more
> > > > > than once in the request.
> > > > >
> > > > > Now about the semantics in your first question. I was thinking that
> > we
> > > > can
> > > > > do the following.
> > > > > a. If ReplicaAssignment is specified, we expect that this will
> > specify
> > > > the
> > > > > replica assignment for all partitions in the topic. For now, we can
> > > have
> > > > > the constraint that there could be more partitions than existing
> > ones,
> > > > but
> > > > > can't be less. In this case, both partitions and replicas are
> > ignored.
> > > > Then
> > > > > for each partition, we do one of the followings.
> > > > > a1.

Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Jay Kreps
Hey guys,

You can implement Closable without the checked exception. Having close()
methods throw checked exceptions isn't very useful unless there is a way
for the caller to recover. In this case there really isn't, right?

-Jay

On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang  wrote:

> Folks,
>
> In a recent commit I made regarding KAFKA-2121, there is an omitted API
> change which makes Serializer / Deserializer extending from Closeable,
> whose close() call could throw IOException by declaration. Hence now some
> scenario like:
>
> -
>
> Serializer keySerializer = ...
> Serializer valueSerializer = ...
> KafkaProducer producer = new KafkaProducer(config, keySerializer,
> valueSerializer)
> // ...
> keySerializer.close()
> valueSerializer.close()
>
> -
>
> will need to capture IOException now.
>
> Want to bring this up for people's attention, and you opinion on whether we
> should revert this change?
>
> -- Guozhang
>


[jira] [Commented] (KAFKA-1843) Metadata fetch/refresh in new producer should handle all node connection states gracefully

2015-04-28 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517659#comment-14517659
 ] 

Ewen Cheslack-Postava commented on KAFKA-1843:
--

A closely related and easily reproduced problem was also reported by 
[~omkreddy] on the mailing list:

We are testing new producer on a 2 node cluster.
Under some node failure scenarios, producer is not able
to update metadata.

{quote}
Steps to reproduce
1. form a 2 node cluster (K1, K2)
2. create a topic with single partition, replication factor = 2
3. start producing data (producer metadata : K1,K2)
2. Kill leader node (say K1)
3. K2 becomes the leader (producer metadata : K2)
4. Bring back K1 and Kill K2 before metadata.max.age.ms
5. K1 becomes the Leader (producer metadata still contains : K2)

After this point, producer is not able to update the metadata.
producer continuously trying to connect with dead node (K2).
{quote}

> Metadata fetch/refresh in new producer should handle all node connection 
> states gracefully
> --
>
> Key: KAFKA-1843
> URL: https://issues.apache.org/jira/browse/KAFKA-1843
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.8.2.0
>Reporter: Ewen Cheslack-Postava
>
> KAFKA-1642 resolved some issues with the handling of broker connection states 
> to avoid high CPU usage, but made the minimal fix rather than the ideal one. 
> The code for handling the metadata fetch is difficult to get right because it 
> has to handle a lot of possible connectivity states and failure modes across 
> all the known nodes. It also needs to correctly integrate with the 
> surrounding event loop, providing correct poll() timeouts to both avoid busy 
> looping and make sure it wakes up and tries new nodes in the face of both 
> connection and request failures.
> A patch here should address a few issues:
> 1. Make sure connection timeouts, as implemented in KAFKA-1842, are cleanly 
> integrated. This mostly means that when a connecting node is selected to 
> fetch metadata from, that the code notices that and sets the next timeout 
> based on the connection timeout rather than some other backoff.
> 2. Rethink the logic and naming of NetworkClient.leastLoadedNode. That method 
> actually takes into account a) the current connectivity of each node, b) 
> whether the node had a recent connection failure, c) the "load" in terms of 
> in flight requests. It also needs to ensure that different clients don't use 
> the same ordering across multiple calls (which is already addressed in the 
> current code by nodeIndexOffset) and that we always eventually try all nodes 
> in the face of connection failures (which isn't currently handled by 
> leastLoadedNode and probably cannot be without tracking additional state). 
> This method also has to work for new consumer use cases even though it is 
> currently only used by the new producer's metadata fetch. Finally it has to 
> properly handle when other code calls initiateConnect() since the normal path 
> for sending messages also initiates connections.
> We can already say that there is an order of preference given a single call 
> (as follows), but making this work across multiple calls when some initial 
> choices fail to connect or return metadata *and* connection states may be 
> changing is much more difficult.
>  * Connected, zero in flight requests - the request can be sent immediately
>  * Connecting node - it will hopefully be connected very soon and by 
> definition has no in flight requests
>  * Disconnected - same reasoning as for a connecting node
>  * Connected, > 0 in flight requests - we consider any # of in flight 
> requests as a big enough backlog to delay the request a lot.
> We could use an approach that better accounts for # of in flight requests 
> rather than just turning it into a boolean variable, but that probably 
> introduces much more complexity than it is worth.
> 3. The most difficult case to handle so far has been when leastLoadedNode 
> returns a disconnected node to maybeUpdateMetadata as its best option. 
> Properly handling the two resulting cases (initiateConnect fails immediately 
> vs. taking some time to possibly establish the connection) is tricky.
> 4. Consider optimizing for the failure cases. The most common cases are when 
> you already have an active connection and can immediately get the metadata or 
> you need to establish a connection, but the connection and metadata 
> request/response happen very quickly. These common cases are infrequent 
> enough (default every 5 min) that establishing an extra connection isn't a 
> big deal as long as it's eventually cleaned up. The edge cases, like network 
> partitions where some subset of nodes become unrea

[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-28 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517647#comment-14517647
 ] 

Ewen Cheslack-Postava commented on KAFKA-2121:
--

I'd give people some more time to comment, it's tough to keep up with the 
volume of that list and it hasn't been a full day yet.

If we revert it, I'd personally revert both just for consistency, but if it 
generates any more discussion on the mailing list you should probably just ask 
people there what they think.

On where to submit the patch, I don't know that there is a common practice for 
this. I'd just submit it here. That way it will get tagged with this JIRA in 
the commit log message which makes it easy for people to track down the origin 
of the changes.

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
> KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
> KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
> KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
> KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
> KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > >
> > > }
> > > ---
> > >
> > > let's say MyMetricsReporter creates a thread in constructor. if hostname
> > > validation threw an exception, constructor won't call the close method of
> > > MyMetricsReporter to clean up the resource. as a result, we created
> > thread
> > > leak issue. this becomes worse when we try to auto recovery (i.e. keep
> > > creati

[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-04-28 Thread Stefan Miklosovic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517627#comment-14517627
 ] 

Stefan Miklosovic commented on KAFKA-1835:
--

Is there actually any workaround? I am using "new" producer in vert.x verticle 
and it blocks and timeouts in 60 seconds because it can not fetch these 
metadata. What should I do?

> Kafka new producer needs options to make blocking behavior explicit
> ---
>
> Key: KAFKA-1835
> URL: https://issues.apache.org/jira/browse/KAFKA-1835
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
>Reporter: Paul Pearcy
> Fix For: 0.8.3
>
> Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
> KAFKA-1835.patch
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to 
> retrieve metadata for a topic. This is not the desired behavior in some use 
> cases where async non-blocking guarantees are required and message loss is 
> acceptable in known cases. Also, most developers will assume an API that 
> returns a future is safe to call in a critical request path. 
> Discussing on the mailing list, the most viable option is to have the 
> following settings:
>  pre.initialize.topics=x,y,z
>  pre.initialize.timeout=x
>  
> This moves potential blocking to the init of the producer and outside of some 
> random request. The potential will still exist for blocking in a corner case 
> where connectivity with Kafka is lost and a topic not included in pre-init 
> has a message sent for the first time. 
> There is the question of what to do when initialization fails. There are a 
> couple of options that I'd like available:
> - Fail creation of the client 
> - Fail all sends until the meta is available 
> Open to input on how the above option should be expressed. 
> It is also worth noting more nuanced solutions exist that could work without 
> the extra settings, they just end up having extra complications and at the 
> end of the day not adding much value. For instance, the producer could accept 
> and queue messages(note: more complicated than I am making it sound due to 
> storing all accepted messages in pre-partitioned compact binary form), but 
> you're still going to be forced to choose to either start blocking or 
> dropping messages at some point. 
> I have some test cases I am going to port over to the Kafka producer 
> integration ones and start from there. My current impl is in scala, but 
> porting to Java shouldn't be a big deal (was using a promise to track init 
> status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2145) An option to add topic owners.

2015-04-28 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian reassigned KAFKA-2145:
--

Assignee: Neelesh Srinivas Salian

> An option to add topic owners. 
> ---
>
> Key: KAFKA-2145
> URL: https://issues.apache.org/jira/browse/KAFKA-2145
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Parth Brahmbhatt
>Assignee: Neelesh Srinivas Salian
>
> We need to expose a way so users can identify users/groups that share 
> ownership of topic. We discussed adding this as part of 
> https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be 
> simpler to add owner as a logconfig. 
> The owner field can be used for auditing and also by authorization layer to 
> grant access without having to explicitly configure acls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2145) An option to add topic owners.

2015-04-28 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517613#comment-14517613
 ] 

Neelesh Srinivas Salian commented on KAFKA-2145:


Had some clarification questions:

1) The objective is to have a Topic Owner field associated with the topic. 
Shall this be associated with the topic at all times? 

2) Is there a datatype that would be specific for the ownername in TopicConfig 
and LogConfig? 





> An option to add topic owners. 
> ---
>
> Key: KAFKA-2145
> URL: https://issues.apache.org/jira/browse/KAFKA-2145
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Parth Brahmbhatt
>Assignee: Neelesh Srinivas Salian
>
> We need to expose a way so users can identify users/groups that share 
> ownership of topic. We discussed adding this as part of 
> https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be 
> simpler to add owner as a logconfig. 
> The owner field can be used for auditing and also by authorization layer to 
> grant access without having to explicitly configure acls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-28 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517596#comment-14517596
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

[~ewencp] I am ok to reverse the change on Serializer. 

what about Deserializer? I assume nobody is using it because consumer is not 
feature completed yet. so we can keep the change.

should we wait more feedbacks on the DISCUSS email thread that [~guozhang] 
started?

should I include the revert as part of KAFKA-2151? or a new jira for reverting? 
what's the common practice here?

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
> KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
> KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
> KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
> KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
> KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > >
> > > }
> > > ---
> > >
> > > let's say MyMetricsReporter creates a thread in constructor. if hostname
> > > validation threw an exception, constructor won't call the close method of
> > > MyMetricsReporter to clean up the resource. as a result, we created
> > thread
> > > leak issue. this becomes worse when we try to auto recovery (i.e. keep
> > > creating KafkaProducer again -> failing again -> more thread leaks).
> > >
> > > there are multiple options of fixing this.
> > >
> > > 1) just move the hostname validation to the beginning. but this is only

[jira] [Updated] (KAFKA-2154) MetadataResponse is Empty on a Fresh Cluster

2015-04-28 Thread Keith Bourgoin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Bourgoin updated KAFKA-2154:
--
Description: 
When I start a fresh cluster using {{bin/kafka-server-start.sh}} and issue a 
MetadataRequest to it, the results are blank.  It's correct that there are no 
topics, but there are also no brokers returned.  I'm writing a driver for 
Kafka, so this makes the initial connection to the cluster difficult.

To reproduce:

  * Start Zookeeper with {{bin/zookeeper-server-start.sh 
config/zookeeper.properties}} and a broker with {{bin/kafka-server-start.sh 
config/server.properties}}.  Be sure there's nothing in {{/tmp}} from a 
previous run.
  * Run this {{echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd}} and observe the output:
{noformat}
  00 00 00 0c 00 00 00 00  00 00 00 00 00 00 00 00  ||
0010
{noformat}
  * Create a topic using {{bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --partitions 2 --replication-factor 1}}
  * Re-run the same command and now observe the output:
{noformat}
kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd
  00 00 00 61 00 00 00 00  00 00 00 01 00 00 00 00  |...a|
0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
0020  84 00 00 00 01 00 00 00  04 74 65 73 74 00 00 00  |.test...|
0030  02 00 00 00 00 00 01 00  00 00 00 00 00 00 01 00  ||
0040  00 00 00 00 00 00 01 00  00 00 00 00 00 00 00 00  ||
0050  00 00 00 00 00 00 00 00  01 00 00 00 00 00 00 00  ||
0060  01 00 00 00 00|.|
0065
{noformat}

In this case, "parsely-dev" is the name of my work VM and the "#" following it 
is the port number.  I've verified it's a correctly formatted MetadataResponse. 
 It's the first null result that we've having a hard time dealing with.

As for the bytestring, that's a correctly formatted MetadataRequest with no 
topics specified.  Presumably if I specified a topic name it would auto-create 
the topic and then start returning broker information.  It doesn't really 
change the fact that the initial state is fairly broken.

Finally, it's worth noting that if I delete the "test" topic (after turning on 
{{delete.topic.enable}}) then the responses still include broker information. 
It's just the initial state which is causing problems.

{noformat}
kfb@parsely-dev:~/src/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 
--delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd
  00 00 00 21 00 00 00 00  00 00 00 01 00 00 00 00  |...!|
0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
0020  84 00 00 00 00|.|
0025
{noformat}

  was:
When I start a fresh cluster using "{{bin/kafka-server-start.sh}}" and issue a 
MetadataRequest to it, the results are blank.  It's correct that there are no 
topics, but there are also no brokers returned.  I'm writing a driver for 
Kafka, so this makes the initial connection to the cluster difficult.

To reproduce:

  * Start Zookeeper with "{{bin/zookeeper-server-start.sh 
config/zookeeper.properties}}" and a broker with "{{bin/kafka-server-start.sh 
config/server.properties}}".  Be sure there's nothing in {{/tmp}} from a 
previous run.
  * Run this "{{echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd}}" and observe the output:
{noformat}
  00 00 00 0c 00 00 00 00  00 00 00 00 00 00 00 00  ||
0010
{noformat}
  * Create a topic using "{{bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --partitions 2 --replication-factor 1}}"
  * Re-run the same command and now observe the output:
{noformat}
kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd
  00 00 00 61 00 00 00 00  00 00 00 01 00 00 00 00  |...a|
0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
0020  84 00 00 00 01 00 00 00  04 74 65 73 74 00 00 00  |.test...|
0030  02 00 00 00 00 00 01 00  00 00 00 00 00 00 01 00  ||
0040  00 00 00 00 00 00 01 00  00 00 00 00 00 00 00 00  ||
0050  00 00 00 00 00 00 00 00  01 00 00 00 00 00 00 00  ||
0060  01 00 00 

[jira] [Created] (KAFKA-2154) MetadataResponse is Empty on a Fresh Cluster

2015-04-28 Thread Keith Bourgoin (JIRA)
Keith Bourgoin created KAFKA-2154:
-

 Summary: MetadataResponse is Empty on a Fresh Cluster
 Key: KAFKA-2154
 URL: https://issues.apache.org/jira/browse/KAFKA-2154
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Keith Bourgoin


When I start a fresh cluster using "{{bin/kafka-server-start.sh}}" and issue a 
MetadataRequest to it, the results are blank.  It's correct that there are no 
topics, but there are also no brokers returned.  I'm writing a driver for 
Kafka, so this makes the initial connection to the cluster difficult.

To reproduce:

  * Start Zookeeper with "{{bin/zookeeper-server-start.sh 
config/zookeeper.properties}}" and a broker with "{{bin/kafka-server-start.sh 
config/server.properties}}".  Be sure there's nothing in {{/tmp}} from a 
previous run.
  * Run this "{{echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd}}" and observe the output:
{noformat}
  00 00 00 0c 00 00 00 00  00 00 00 00 00 00 00 00  ||
0010
{noformat}
  * Create a topic using "{{bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --partitions 2 --replication-factor 1}}"
  * Re-run the same command and now observe the output:
{noformat}
kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd
  00 00 00 61 00 00 00 00  00 00 00 01 00 00 00 00  |...a|
0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
0020  84 00 00 00 01 00 00 00  04 74 65 73 74 00 00 00  |.test...|
0030  02 00 00 00 00 00 01 00  00 00 00 00 00 00 01 00  ||
0040  00 00 00 00 00 00 01 00  00 00 00 00 00 00 00 00  ||
0050  00 00 00 00 00 00 00 00  01 00 00 00 00 00 00 00  ||
0060  01 00 00 00 00|.|
0065
{noformat}

In this case, "parsely-dev" is the name of my work VM and the "#" following it 
is the port number.  I've verified it's a correctly formatted MetadataResponse. 
 It's the first null result that we've having a hard time dealing with.

As for the bytestring, that's a correctly formatted MetadataRequest with no 
topics specified.  Presumably if I specified a topic name it would auto-create 
the topic and then start returning broker information.  It doesn't really 
change the fact that the initial state is fairly broken.

Finally, it's worth noting that if I delete the "test" topic (after turning on 
{{delete.topic.enable}}) then the responses still include broker information. 
It's just the initial state which is causing problems.

{noformat}
kfb@parsely-dev:~/src/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 
--delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
kfb@parsely-dev:~/src/ct/pykafka$ echo -e 
"\x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00"
 | nc localhost 9092 | hd
  00 00 00 21 00 00 00 00  00 00 00 01 00 00 00 00  |...!|
0010  00 0b 70 61 72 73 65 6c  79 2d 64 65 76 00 00 23  |..parsely-dev..#|
0020  84 00 00 00 00|.|
0025
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-28 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2129:
--
Assignee: Tim Brooks
  Status: Patch Available  (was: Open)

> Consumer could make multiple concurrent metadata requests
> -
>
> Key: KAFKA-2129
> URL: https://issues.apache.org/jira/browse/KAFKA-2129
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Assignee: Tim Brooks
>Priority: Minor
> Attachments: KAFKA-2129.patch
>
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-28 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2129:
--
Attachment: KAFKA-2129.patch

> Consumer could make multiple concurrent metadata requests
> -
>
> Key: KAFKA-2129
> URL: https://issues.apache.org/jira/browse/KAFKA-2129
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Priority: Minor
> Attachments: KAFKA-2129.patch
>
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-28 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517557#comment-14517557
 ] 

Tim Brooks commented on KAFKA-2129:
---

Created reviewboard https://reviews.apache.org/r/33634/diff/
 against branch origin/trunk

> Consumer could make multiple concurrent metadata requests
> -
>
> Key: KAFKA-2129
> URL: https://issues.apache.org/jira/browse/KAFKA-2129
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Priority: Minor
> Attachments: KAFKA-2129.patch
>
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-28 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2129:
--
Priority: Major  (was: Minor)

> Consumer could make multiple concurrent metadata requests
> -
>
> Key: KAFKA-2129
> URL: https://issues.apache.org/jira/browse/KAFKA-2129
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Assignee: Tim Brooks
> Attachments: KAFKA-2129.patch
>
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33634: Patch for KAFKA-2129

2015-04-28 Thread Tim Brooks

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33634/
---

Review request for kafka.


Bugs: KAFKA-2129
https://issues.apache.org/jira/browse/KAFKA-2129


Repository: kafka


Description
---

Synchronize method


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 

Diff: https://reviews.apache.org/r/33634/diff/


Testing
---


Thanks,

Tim Brooks



[jira] [Commented] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-28 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517553#comment-14517553
 ] 

Tim Brooks commented on KAFKA-2129:
---

It kind of looks like this is a larger issue than maybe I realized when I first 
opened this ticket. Let's say that thread 1 is calling poll() on the 
KafkaClient. And then thread 2 calls partionsFor() a topic that is not locally 
known on the KafkaClient.

Both threads will make it into the poll() method on the Selector since 
partitionsFor() is not synchronized. If Thread 1 is in the middle of a poll(), 
tons of intermediate state will be lost when Thread 2 calls the clear() method 
on the selector:

this.completedSends.clear();
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
this.disconnected.addAll(this.failedSends);
this.failedSends.clear();

I can generate a number of failing integration tests by adding:

scala.concurrent.future {
  Thread.sleep(30)
  consumer.partitionsFor("weird-topic")
}

in consumeRecords() in the ConsumerTest right before the call is made to poll().

If I add the synchronize keyword to the partionsFor() method these errors go 
away. Is this the correct approach to this ticket? Obviously those errors are 
an issue since the KafkaConsumer documentation indicates that the class is 
threadsafe.

But adding synchronize to the method means that calling partitionsFor() will be 
blocked on a poll() that is in progress. And hopefully, the majority of the 
time partitionsFor() will not require a network call.

Anyway, I added a patch to synchronize that method. But if the we are 
interested in a non synchronized method to get locally-known partitions for 
that topic, we will need a different change.

> Consumer could make multiple concurrent metadata requests
> -
>
> Key: KAFKA-2129
> URL: https://issues.apache.org/jira/browse/KAFKA-2129
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tim Brooks
>Priority: Minor
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Adding multi-tenancy capabilities to Kafka

2015-04-28 Thread Gwen Shapira
I think recent discussion showed some need for topic namespaces - for
example, Jun's use case for reserving topic names for specific users
discussed under authorization.

I think some discussion should happen on namespaces vs more full-fledged
topic-hierarchy.
I like the simplicity of namespaces, but there may be other requirements
(such as inheriting configuration).

Gwen

On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston 
wrote:

> Hi all,
>
> I've been looking at how a Kafka cluster could be used be deployed so that
> it can be used by multiple tenants.  Specifically: a scheme where the
> clients belonging to each tenant receive the impression they are operating
> against their own cluster.  The ongoing security and quota work looks like
> it might provide a lot of the isolation requirements, but each tenant would
> currently share the same namespace for topics and consumer groups.  So the
> illusion of "it is my own cluster" is broken as soon as two tenants try
> independent to use the same topic name.
>
> I wondered if other people on this list are interested in being able to
> support multiple tenants in this way?  And / or if the ability to avoid
> clashes in the topic namespace would be useful?  I am considering
> submitting a KIP in this area - but first wanted to get a feeling for
> whether these kinds of capabilities are of interest to others.
>
> Thanks in advance,
> - Adrian
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>


Adding multi-tenancy capabilities to Kafka

2015-04-28 Thread Adrian Preston
Hi all,

I've been looking at how a Kafka cluster could be used be deployed so that it 
can be used by multiple tenants.  Specifically: a scheme where the clients 
belonging to each tenant receive the impression they are operating against 
their own cluster.  The ongoing security and quota work looks like it might 
provide a lot of the isolation requirements, but each tenant would currently 
share the same namespace for topics and consumer groups.  So the illusion of 
"it is my own cluster" is broken as soon as two tenants try independent to use 
the same topic name.

I wondered if other people on this list are interested in being able to support 
multiple tenants in this way?  And / or if the ability to avoid clashes in the 
topic namespace would be useful?  I am considering submitting a KIP in this 
area - but first wanted to get a feeling for whether these kinds of 
capabilities are of interest to others.

Thanks in advance,
- Adrian

Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-28 Thread Aditya Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517489#comment-14517489
 ] 

Aditya Auradkar commented on KAFKA-1886:


[~nehanarkhede] I've updated the patch. This is a pretty trivial but relatively 
useful bug fix and for that reason I think we should merge it. Thoughts?

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch, KAFKA-1886_2015-04-28_10:27:39.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-28 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-1886:
-
Attachment: KAFKA-1886_2015-04-28_10:27:39.patch

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch, KAFKA-1886_2015-04-28_10:27:39.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30196: Patch for KAFKA-1886

2015-04-28 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30196/
---

(Updated April 28, 2015, 5:28 p.m.)


Review request for kafka and Joel Koshy.


Bugs: KAFKA-1886
https://issues.apache.org/jira/browse/KAFKA-1886


Repository: kafka


Description
---

Fixing KAFKA-1886


Diffs
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
cbef84ac76e62768981f74e71d451f2bda995275 

Diff: https://reviews.apache.org/r/30196/diff/


Testing (updated)
---


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-28 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517479#comment-14517479
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Updated reviewboard https://reviews.apache.org/r/30196/diff/
 against branch origin/trunk

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch, KAFKA-1886_2015-04-28_10:27:39.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30196: Patch for KAFKA-1886

2015-04-28 Thread Aditya Auradkar


> On Feb. 7, 2015, 4:22 p.m., Neha Narkhede wrote:
> > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala, line 295
> > 
> >
> > Why do you need the sleep here? We try to avoid blindly sleeping in 
> > Kafka tests since it almost always leads to transient test failures. 
> > Consider using TestUtils.waitUntilTrue().
> 
> Aditya Auradkar wrote:
> Thanks Neha. I missed this review comment.
> 
> I agree sleeping isn't ideal here but I don't think there is a condition 
> I can wait on to trigger this specific exception. The client has to be 
> waiting on a response from the server. I'm not even sure that this testcase 
> needs to exist in PrimitiveApiTest since it isn't testing an API. Can you 
> suggest a better place to put it, if it makes sense to keep it at all?
> 
> Neha Narkhede wrote:
> Yeah, if we can't make a fullproof unit test, let's remove it. We are 
> really trying to reduce the number of randomly failing unit tests.

Hey Neha,
I've removed the test.


- Aditya


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30196/#review71556
---


On April 28, 2015, 5:28 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30196/
> ---
> 
> (Updated April 28, 2015, 5:28 p.m.)
> 
> 
> Review request for kafka and Joel Koshy.
> 
> 
> Bugs: KAFKA-1886
> https://issues.apache.org/jira/browse/KAFKA-1886
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fixing KAFKA-1886
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> cbef84ac76e62768981f74e71d451f2bda995275 
> 
> Diff: https://reviews.apache.org/r/30196/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Re: Review Request 30196: Patch for KAFKA-1886

2015-04-28 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30196/
---

(Updated April 28, 2015, 5:27 p.m.)


Review request for kafka and Joel Koshy.


Bugs: KAFKA-1886
https://issues.apache.org/jira/browse/KAFKA-1886


Repository: kafka


Description (updated)
---

Fixing KAFKA-1886


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
cbef84ac76e62768981f74e71d451f2bda995275 

Diff: https://reviews.apache.org/r/30196/diff/


Testing
---

Added an integration test to PrimitiveAPITest.scala.


Thanks,

Aditya Auradkar



Build failed in Jenkins: Kafka-trunk #481

2015-04-28 Thread Apache Jenkins Server
See 

Changes:

[nehanarkhede] KAFKA-1621 : Standardize --messages option. Closes #58

--
[...truncated 1267 lines...]
kafka.network.SocketServerTest > testMaxConnectionsPerIPOverrides PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogSegmentTest > testTruncate P

Build failed in Jenkins: KafkaPreCommit #93

2015-04-28 Thread Apache Jenkins Server
See 

Changes:

[nehanarkhede] KAFKA-1621 : Standardize --messages option. Closes #58

--
[...truncated 1275 lines...]
kafka.network.SocketServerTest > testMaxConnectionsPerIPOverrides PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogSegmentTest > testTruncate

[jira] [Updated] (KAFKA-1621) Standardize --messages option in perf scripts

2015-04-28 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1621:
-
Resolution: Fixed
  Assignee: Rekha Joshi
Status: Resolved  (was: Patch Available)

Merged PR #58

> Standardize --messages option in perf scripts
> -
>
> Key: KAFKA-1621
> URL: https://issues.apache.org/jira/browse/KAFKA-1621
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
>Reporter: Jay Kreps
>Assignee: Rekha Joshi
>  Labels: newbie
>
> This option is specified in PerfConfig and is used by the producer, consumer 
> and simple consumer perf commands. The docstring on the argument does not 
> list it as required but the producer performance test requires it--others 
> don't.
> We should standardize this so that either all the commands require the option 
> and it is marked as required in the docstring or none of them list it as 
> required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [VOTE] KIP-11- Authorization design for kafka security

2015-04-28 Thread Sun, Dapeng
Thank you for your reply, Gwen.

>1. Complex rule systems can be difficult to reason about and therefore end up 
>being less secure. The rule "Deny always wins" is very easy to grasp.
Yes, I'm agreed with your point: we should not make the rule complex.

>2. We currently don't have any mechanism for specifying IP ranges (or host
>ranges) at all. I think its a pretty significant deficiency, but it does mean 
>that we don't need to worry about the issue of blocking a large range while 
>unblocking few servers in the range.
Support ranges sounds reasonable. If this feature will be in development plan, 
I also don't think we can put "the best matching acl" and " Support ip ranges" 
together. 

>We have a call tomorrow (Tuesday, April 28) at 3pm PST - to discuss this and 
>other outstanding design issues (not all related to security). If you are 
>interested in joining - let me know and I'll forward you the invite.
Thank you, Gwen. I have the invite and I should be at home at that time. But 
due to network issue, I may can't join the meeting smoothly.

Regards
Dapeng

-Original Message-
From: Gwen Shapira [mailto:gshap...@cloudera.com] 
Sent: Tuesday, April 28, 2015 1:31 PM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-11- Authorization design for kafka security

While I see the advantage of being able to say something like: "deny user X 
from hosts h1...h200" also "allow user X from host h189", there are two issues 
here:

1. Complex rule systems can be difficult to reason about and therefore end up 
being less secure. The rule "Deny always wins" is very easy to grasp.

2. We currently don't have any mechanism for specifying IP ranges (or host
ranges) at all. I think its a pretty significant deficiency, but it does mean 
that we don't need to worry about the issue of blocking a large range while 
unblocking few servers in the range.

Gwen

P.S
We have a call tomorrow (Tuesday, April 28) at 3pm PST - to discuss this and 
other outstanding design issues (not all related to security). If you are 
interested in joining - let me know and I'll forward you the invite.

Gwen

On Mon, Apr 27, 2015 at 10:15 PM, Sun, Dapeng  wrote:

> Attach the image.
>
> https://raw.githubusercontent.com/sundapeng/attachment/master/kafka-ac
> l1.png
>
> Regards
> Dapeng
>
> From: Sun, Dapeng [mailto:dapeng@intel.com]
> Sent: Tuesday, April 28, 2015 11:44 AM
> To: dev@kafka.apache.org
> Subject: RE: [VOTE] KIP-11- Authorization design for kafka security
>
>
> Thank you for your rapid reply, Parth.
>
>
>
> >* I think the wiki already describes the precedence order as Deny 
> >taking
> precedence over allow when conflicting acls are found 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorizati
> on+In
>
> >terface#KIP-11-AuthorizationInterface-PermissionType
>
> Got it, thank you.
>
>
>
> >* In the first version that I am currently writing there is no group
> support. Even when we add it I don't see the need to add a precedence 
> for evaluation. it does not matter which principal matches as long as
>
> > we have a match.
>
>
>
> About this part, I think we should choose the best matching acl for 
> authorization, no matter we support group or not.
>
>
>
> For the case
>
>  [cid:image001.png@01D08197.E94BD410]
>
> https://raw.githubusercontent.com/sundapeng/attachment/master/kafka-ac
> l1.png
>
>
>
> if 2 Acls are defined, one that deny an operation from all hosts and 
> one that allows the operation from host1, the operation from host1 
> will be denied or allowed?
>
> According wiki "Deny will take precedence over Allow in competing 
> acls.", it seems acl_1 will win the competition, but customers' 
> intention may be "allow".
>
> I think "deny always take precedence over Allow" is okay, but  "host1 
> -> user1"  >  "host1 "  >  "default" may make sense.
>
>
>
>
>
> >* Acl storage is indexed by resource right now because that is the
> primary lookup id for all authorize operations. Given acls are cached 
> I don't see the need to optimized the storage layer any further for lookup.
>
> >* The reason why we have acl with multi everything is to reduce
> redundancy in acl storage. I am not sure how will we be able to reduce 
> redundancy if we divide it by using one principal,one host, one operation.
>
>
>
> Yes, I'm also agreed with "Acl storage should be indexed by resource".
> Under resource index, it may be better to add index such as hosts and 
> principals. One option may be one principal, one host, one operation. 
> Just give your these scenarios for considering.
>
>
>
> For the case defined in wiki:
>
> Acl_1 -> {"user:bob", "user:*"} is allowed to READ from all hosts.
>
> Acl_2 -> {"user:bob"} is denied to READ from host1
>
> Acl_3 -> {"user:alice", "group:kafka-devs"} is allowed to READ and 
> WRITE from {host1, host2}.
>
>
>
> For acl_3, if we want to remove alice's WRITE from {host1,host2} and 
> remove alice's READ from host1, user may have following ways to achieve:
>
>
>
> 1.Remove the parts o

[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client

2015-04-28 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1690:
--
Status: Patch Available  (was: In Progress)

> new java producer needs ssl support as a client
> ---
>
> Key: KAFKA-1690
> URL: https://issues.apache.org/jira/browse/KAFKA-1690
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.3
>
> Attachments: KAFKA-1690.patch, KAFKA-1690.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client

2015-04-28 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1690:
--
Attachment: KAFKA-1690.patch

> new java producer needs ssl support as a client
> ---
>
> Key: KAFKA-1690
> URL: https://issues.apache.org/jira/browse/KAFKA-1690
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.3
>
> Attachments: KAFKA-1690.patch, KAFKA-1690.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-04-28 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14516587#comment-14516587
 ] 

Sriharsha Chintalapani commented on KAFKA-1690:
---

Created reviewboard https://reviews.apache.org/r/33620/diff/
 against branch origin/trunk

> new java producer needs ssl support as a client
> ---
>
> Key: KAFKA-1690
> URL: https://issues.apache.org/jira/browse/KAFKA-1690
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.3
>
> Attachments: KAFKA-1690.patch, KAFKA-1690.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33620: Patch for KAFKA-1690

2015-04-28 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/
---

Review request for kafka.


Bugs: KAFKA-1690
https://issues.apache.org/jira/browse/KAFKA-1690


Repository: kafka


Description
---

KAFKA-1690. new java producer needs ssl support as a client.


Diffs
-

  checkstyle/checkstyle.xml a215ff36e9252879f1e0be5a86fef9a875bb8f38 
  checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
42c72198a0325e234cf1d428b687663099de884e 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
09ecb427c9f4482dd064428815128b1c426dc921 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
5a575553d30c1c0bda9ffef9e9b9eafae28deba5 
  clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/Channel.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
d3394ee669e1c2254403e95393511d0a17e6f250 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
f73eedb030987f018d8446bb1dcd98d19fa97331 
  clients/src/test/java/org/apache/kafka/common/network/EchoServer.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
  clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION 

Diff: https://reviews.apache.org/r/33620/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-04-28 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14516577#comment-14516577
 ] 

Gwen Shapira commented on KAFKA-1928:
-

Moved SocketServer to use Selector. 
All tests are passing at this point, but its still kinda messy.

[~harsha_ch] - you can take a look and see how much it changes things for the 
security patches.
This is far from final, but should give you a good idea of what changed.

Known issues:
* None of Jun's comments in RB are addressed
* I may have messed up metrics completely. I'm not sure what I'm doing there 
and not sure how we test those.
* Known messed metric: We can't measure SentBytes for generic Send (which is 
why the old metrics just counted number of messages). There's a big TODO in 
Selector for this.
* Request.requestKey should be renamed and changed to String
* I removed testNullResponse from SocketServerTest. Now that selectionKeys are 
encapsulated by Selectors that are encapsulated by Processors that are 
encapsulated by SocketServer - I'm not sure there's a good way to write this 
test as part of SocketServerTest.

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch
>
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Ewen Cheslack-Postava
I suggested the change to Closeable mainly to avoid some verbosity and
redundancy in the code since we have to call close() on multiple components
and without a common interface, the error handling code is repeated. Using
Closeable does make the code cleaner and more maintainable, but the issue
was pretty quickly identified, which suggests there might be more people
calling close() on the serializer.

I think we should probably revert the use of Closeable and just use the
more verbose version. I'm definitely in favor of being pretty strict about
compatibility, and the readability/maintainability cost here isn't awful.

Given some other thoughts about system and protocol compatibility tests
like Gwen suggests in KAFKA-2003, it'd probably also be worth having a few
sample client applications that we can compile against version X and run
against version Y to verify API compatibility. Even just a few different
examples from different developers could give us reasonable API coverage.


On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang  wrote:

> Folks,
>
> In a recent commit I made regarding KAFKA-2121, there is an omitted API
> change which makes Serializer / Deserializer extending from Closeable,
> whose close() call could throw IOException by declaration. Hence now some
> scenario like:
>
> -
>
> Serializer keySerializer = ...
> Serializer valueSerializer = ...
> KafkaProducer producer = new KafkaProducer(config, keySerializer,
> valueSerializer)
> // ...
> keySerializer.close()
> valueSerializer.close()
>
> -
>
> will need to capture IOException now.
>
> Want to bring this up for people's attention, and you opinion on whether we
> should revert this change?
>
> -- Guozhang
>



-- 
Thanks,
Ewen


[jira] [Commented] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-04-28 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14516563#comment-14516563
 ] 

Gwen Shapira commented on KAFKA-1928:
-

Updated reviewboard https://reviews.apache.org/r/33065/diff/
 against branch trunk

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch
>
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-04-28 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1928:

Status: Patch Available  (was: In Progress)

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch
>
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-04-28 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1928:

Attachment: KAFKA-1928_2015-04-28_00:09:40.patch

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch
>
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33065: Patch for KAFKA-1928

2015-04-28 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated April 28, 2015, 7:09 a.m.)


Review request for kafka.


Bugs: KAFKA-1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
431190ab94afc4acfc14348a1fc720e17c071cea 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
67811a752a470bf9bdbc8c5419e8d6e20a006169 
  core/src/main/scala/kafka/api/OffsetRequest.scala 
3d483bc7518ad76f9548772522751afb4d046b78 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 
5e14987c990fe561c01dac2909f5ed21a506e038 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
363bae01752318f3849242b97a6619747697c1d9 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
  core/src/main/scala/kafka/client/ClientUtils.scala 
62394c0d3813f19a443cf862c8bc6c5808be9f88 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
cbef84ac76e62768981f