Re: Review Request 33620: Patch for KAFKA-1690

2015-06-23 Thread Sriharsha Chintalapani


 On May 22, 2015, 12:33 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 231
  https://reviews.apache.org/r/33620/diff/8/?file=966794#file966794line231
 
  Do we need config.values? Could we just pass in config.originals() as 
  we do for the serializer?

config.originals() contains the user configured properties only it doesn't have 
defaults defined in ProducerConfig. On the side note shouldn't we use 
config.values() everywhere?


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review84232
---


On June 4, 2015, 1:52 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated June 4, 2015, 1:52 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Diffs
 -
 
   build.gradle cd2aa838fd53e8124f308979b1d70efe0c5725a6 
   checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 c4fa058692f50abb4f47bd344119d805c60123f5 
   clients/src/main/java/org/apache/kafka/common/config/SecurityConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/Channel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   

[jira] [Commented] (KAFKA-2297) VerifiableProperties fails when constructed with Properties with numeric values

2015-06-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14597942#comment-14597942
 ] 

Gwen Shapira commented on KAFKA-2297:
-

Yeah, I think its a known issue - VerifiableProperties only takes String values.

Considering that VerifiableProperties will be deprecated in next release in 
favor of AbstractConfig and ConfigDef, I'm not sure its worth fixing.
But if you already have a patch, feel free to submit :)

 VerifiableProperties fails when constructed with Properties with numeric 
 values
 ---

 Key: KAFKA-2297
 URL: https://issues.apache.org/jira/browse/KAFKA-2297
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
 Environment: Ubuntu 12.04, OSX 10.9, OpenJDK 7, Scala 2.10
Reporter: Grant Overby
Priority: Minor

 VarifiableProperties produces java.lang.NumberFormatException: null when a 
 get method for a numeric type is called and VerifiableProperties was 
 constructed with a Properties containing a numeric type.
 private static void works() {
 Properties props = new Properties();
 props.put(request.required.acks, 0);
 new VerifiableProperties(props).getShort(request.required.acks, 
 (short) 0);
 }
 private static void doesntWork() {
 Properties props = new Properties();
 props.put(request.required.acks, (short) 0);
 new VerifiableProperties(props).getShort(request.required.acks, 
 (short) 0);
 }
 Exception in thread main java.lang.NumberFormatException: null
   at java.lang.Integer.parseInt(Integer.java:454)
   at java.lang.Short.parseShort(Short.java:117)
   at java.lang.Short.parseShort(Short.java:143)
   at 
 scala.collection.immutable.StringLike$class.toShort(StringLike.scala:228)
   at scala.collection.immutable.StringOps.toShort(StringOps.scala:31)
   at 
 kafka.utils.VerifiableProperties.getShortInRange(VerifiableProperties.scala:85)
   at 
 kafka.utils.VerifiableProperties.getShort(VerifiableProperties.scala:61)
   at Main.doesntWork(Main.java:22)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2298:

Labels:   (was: quotas)

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
 Attachments: KAFKA-2298.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2297) VerifiableProperties fails when constructed with Properties with numeric values

2015-06-23 Thread Grant Overby (JIRA)
Grant Overby created KAFKA-2297:
---

 Summary: VerifiableProperties fails when constructed with 
Properties with numeric values
 Key: KAFKA-2297
 URL: https://issues.apache.org/jira/browse/KAFKA-2297
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
 Environment: Ubuntu 12.04, OSX 10.9, OpenJDK 7, Scala 2.10
Reporter: Grant Overby
Priority: Minor


VarifiableProperties produces java.lang.NumberFormatException: null when a get 
method for a numeric type is called and VerifiableProperties was constructed 
with a Properties containing a numeric type.

private static void works() {
Properties props = new Properties();
props.put(request.required.acks, 0);
new VerifiableProperties(props).getShort(request.required.acks, 
(short) 0);
}

private static void doesntWork() {
Properties props = new Properties();
props.put(request.required.acks, (short) 0);
new VerifiableProperties(props).getShort(request.required.acks, 
(short) 0);
}


Exception in thread main java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:454)
at java.lang.Short.parseShort(Short.java:117)
at java.lang.Short.parseShort(Short.java:143)
at 
scala.collection.immutable.StringLike$class.toShort(StringLike.scala:228)
at scala.collection.immutable.StringOps.toShort(StringOps.scala:31)
at 
kafka.utils.VerifiableProperties.getShortInRange(VerifiableProperties.scala:85)
at 
kafka.utils.VerifiableProperties.getShort(VerifiableProperties.scala:61)
at Main.doesntWork(Main.java:22)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely

2015-06-23 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-2168:
---
Attachment: KAFKA-2168_2015-06-23_09:39:07.patch

 New consumer poll() can block other calls like position(), commit(), and 
 close() indefinitely
 -

 Key: KAFKA-2168
 URL: https://issues.apache.org/jira/browse/KAFKA-2168
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Jason Gustafson
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, 
 KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, 
 KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, 
 KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, 
 KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, 
 KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, 
 KAFKA-2168_2015-06-23_09:39:07.patch


 The new consumer is currently using very coarse-grained synchronization. For 
 most methods this isn't a problem since they finish quickly once the lock is 
 acquired, but poll() might run for a long time (and commonly will since 
 polling with long timeouts is a normal use case). This means any operations 
 invoked from another thread may block until the poll() call completes.
 Some example use cases where this can be a problem:
 * A shutdown hook is registered to trigger shutdown and invokes close(). It 
 gets invoked from another thread and blocks indefinitely.
 * User wants to manage offset commit themselves in a background thread. If 
 the commit policy is not purely time based, it's not currently possibly to 
 make sure the call to commit() will be processed promptly.
 Two possible solutions to this:
 1. Make sure a lock is not held during the actual select call. Since we have 
 multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) 
 this is probably hard to make work cleanly since locking is currently only 
 performed at the KafkaConsumer level and we'd want it unlocked around a 
 single line of code in Selector.
 2. Wake up the selector before synchronizing for certain operations. This 
 would require some additional coordination to make sure the caller of 
 wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() 
 thread being woken up and then promptly reacquiring the lock with a 
 subsequent long poll() call).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-2298:
---

 Summary: Client Selector can drop connections on 
InvalidReceiveException without notifying NetworkClient
 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin


I run into the problem described in KAFKA-2266 when testing quota. I was told 
the bug was fixed in KAFKA-2266 after I figured out the problem.

But the patch provided in KAFKA-2266 probably doesn't solve all related 
problems. From reading the code there is still one edge case where the client 
selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [GitHub] kafka pull request: Kafka 2276

2015-06-23 Thread Gwen Shapira
Awesome, thanks :)

On Tue, Jun 23, 2015 at 10:32 AM, Geoffrey Anderson ge...@confluent.io wrote:
 Hi Gwen,

 That is indeed the plan, and my understanding is that the merge script
 Ismael has been working on helps committers with this step.

 I'm trying out the Github flow roughly as outlined here:
 http://mail-archives.apache.org/mod_mbox/kafka-dev/201504.mbox/%3ccad5tkzab-hkey-zcr8x4wtxawybxpojx62k1vbv+ycknuxq...@mail.gmail.com%3E

 Ismael's script is here: https://issues.apache.org/jira/browse/KAFKA-2187


 Thanks,

 Geoff

 On Mon, Jun 22, 2015 at 9:59 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks, I indeed missed the original :)

 Is the plan to squash the commits and merge a pull request with single
 commit that matches the JIRA #?
 This will be more in line with how commits were organized until now
 and will make life much easier when cherry-picking.

 Gwen

 On Mon, Jun 22, 2015 at 1:58 PM, Geoffrey Anderson ge...@confluent.io
 wrote:
  Hi,
 
  I'm pinging the dev list regarding KAFKA-2276 (KIP-25 initial patch)
 again
  since it sounds like at least one person I spoke with did not see the
  initial pull request.
 
  Pull request: https://github.com/apache/kafka/pull/70/
  JIRA: https://issues.apache.org/jira/browse/KAFKA-2276
 
  Thanks!
  Geoff
 
 
  On Tue, Jun 16, 2015 at 2:50 PM, granders g...@git.apache.org wrote:
 
  GitHub user granders opened a pull request:
 
  https://github.com/apache/kafka/pull/70
 
  Kafka 2276
 
  Initial patch for KIP-25
 
  Note that to install ducktape, do *not* use pip to install ducktape.
  Instead:
 
  ```
  $ git clone g...@github.com:confluentinc/ducktape.git
  $ cd ducktape
  $ python setup.py install
  ```
 
 
  You can merge this pull request into a Git repository by running:
 
  $ git pull https://github.com/confluentinc/kafka KAFKA-2276
 
  Alternatively you can review and apply these changes as the patch at:
 
  https://github.com/apache/kafka/pull/70.patch
 
  To close this pull request, make a commit to your master/trunk branch
  with (at least) the following in the commit message:
 
  This closes #70
 
  
  commit 81e41562f3836e95e89e12f215c82b1b2d505381
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:32:54Z
 
  Bootstrap Kafka system tests
 
  commit f1914c3ba9b52d0f8db3989c8b031127b42ac59e
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:33:44Z
 
  Merge pull request #2 from confluentinc/system_tests
 
  Bootstrap Kafka system tests
 
  commit a2789885806f98dcd1fd58edc9a10a30e4bd314c
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:21:23Z
 
  fixed typos
 
  commit 07cd1c66a952ee29fc3c8e85464acb43a6981b8a
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:22:14Z
 
  Added simple producer which prints status of produced messages to
  stdout.
 
  commit da94b8cbe79e6634cc32fbe8f6deb25388923029
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T21:07:20Z
 
  Added number of messages option.
 
  commit 212b39a2d75027299fbb1b1008d463a82aab
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T22:35:06Z
 
  Added some metadata to producer output.
 
  commit 8b4b1f2aa9681632ef65aa92dfd3066cd7d62851
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-29T23:38:32Z
 
  Minor updates to VerboseProducer
 
  commit c0526fe44cea739519a0889ebe9ead01b406b365
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:27:15Z
 
  Updates per review comments.
 
  commit bc009f218e00241cbdd23931d01b52c442eef6b7
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:28:28Z
 
  Got rid of VerboseProducer in core (moved to clients)
 
  commit 475423bb642ac8f816e8080f891867a6362c17fa
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T04:05:09Z
 
  Convert class to string before adding to json object.
 
  commit 0a5de8e0590e3a8dce1a91769ad41497b5e07d17
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-02T22:46:52Z
 
  Fixed checkstyle errors. Changed name to VerifiableProducer. Added
  synchronization for thread safety on println statements.
 
  commit 9100417ce0717a71c822c5a279fe7858bfe7a7ee
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-03T19:50:11Z
 
  Updated command-line options for VerifiableProducer. Extracted
  throughput logic to make it reusable.
 
  commit 1228eefc4e52b58c214b3ad45feab36a475d5a66
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:09:14Z
 
  Renamed throttler
 
  commit 6842ed1ffad62a84df67a0f0b6a651a6df085d12
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:12:11Z
 
  left out a file from last commit
 
  commit d586fb0eb63409807c02f280fae786cec55fb348
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:22:34Z
 
  Updated comments to reflect that throttler is not message-specific
 
  commit 

[jira] [Updated] (KAFKA-2294) javadoc compile error due to illegal p/ , build failing (jdk 8)

2015-06-23 Thread Jeff Maxwell (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Maxwell updated KAFKA-2294:

Attachment: KAFKA-2294-1.patch

Fixed close() javadoc

 javadoc compile error due to illegal p/ , build failing (jdk 8)
 -

 Key: KAFKA-2294
 URL: https://issues.apache.org/jira/browse/KAFKA-2294
 Project: Kafka
  Issue Type: Bug
Reporter: Jeremy Fields
 Attachments: KAFKA-2294-1.patch


 Quick one,
 kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:525:
  error: self-closing element not allowed
  * p/
 This is causing build to fail under java 8 due to strict html checking.
 Replace that p/ with p
 Regards,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2294) javadoc compile error due to illegal p/ , build failing (jdk 8)

2015-06-23 Thread Jakob Homan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598057#comment-14598057
 ] 

Jakob Homan commented on KAFKA-2294:


+1.

 javadoc compile error due to illegal p/ , build failing (jdk 8)
 -

 Key: KAFKA-2294
 URL: https://issues.apache.org/jira/browse/KAFKA-2294
 Project: Kafka
  Issue Type: Bug
Reporter: Jeremy Fields
 Attachments: KAFKA-2294-1.patch


 Quick one,
 kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:525:
  error: self-closing element not allowed
  * p/
 This is causing build to fail under java 8 due to strict html checking.
 Replace that p/ with p
 Regards,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-23 Thread Jason Gustafson


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 355
  https://reviews.apache.org/r/34789/diff/12/?file=990006#file990006line355
 
  What's the logic to initiate connection to coordinator if the 
  coordinator is not available during HB?

As it's currently written, we'd skip a heartbeat if we don't have an active 
connection to the coordinator. As long as the heartbeat frequency is 3 or more 
times per session timeout, this is probably ok, but we might want to handle it 
better if we end up exposing the heartbeat frequency in configuration 
(currently it's hard-coded). Perhaps we can fix this in a separate ticket?


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 351-354
  https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line351
 
  These seem redundant give the code below.

I think it's still necessary to call wakeup to abort a long poll if you want to 
ensure timely shutdown. You could probably get away without the closed flag and 
just use the ConsumerWakeupException to close the consumer, but the explicit 
flag seems cleaner.


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 436
  https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line436
 
  Should this be volatile so that different threads can see the latest 
  value of refcount?

I think you are right. Fixed in latest patch.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88914
---


On June 23, 2015, 4:39 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 23, 2015, 4:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely

2015-06-23 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14597914#comment-14597914
 ] 

Jason Gustafson commented on KAFKA-2168:


Updated reviewboard https://reviews.apache.org/r/34789/diff/
 against branch upstream/trunk

 New consumer poll() can block other calls like position(), commit(), and 
 close() indefinitely
 -

 Key: KAFKA-2168
 URL: https://issues.apache.org/jira/browse/KAFKA-2168
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Jason Gustafson
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-2168.patch, KAFKA-2168_2015-06-01_16:03:38.patch, 
 KAFKA-2168_2015-06-02_17:09:37.patch, KAFKA-2168_2015-06-03_18:20:23.patch, 
 KAFKA-2168_2015-06-03_21:06:42.patch, KAFKA-2168_2015-06-04_14:36:04.patch, 
 KAFKA-2168_2015-06-05_12:01:28.patch, KAFKA-2168_2015-06-05_12:44:48.patch, 
 KAFKA-2168_2015-06-11_14:09:59.patch, KAFKA-2168_2015-06-18_14:39:36.patch, 
 KAFKA-2168_2015-06-19_09:19:02.patch, KAFKA-2168_2015-06-22_16:34:37.patch, 
 KAFKA-2168_2015-06-23_09:39:07.patch


 The new consumer is currently using very coarse-grained synchronization. For 
 most methods this isn't a problem since they finish quickly once the lock is 
 acquired, but poll() might run for a long time (and commonly will since 
 polling with long timeouts is a normal use case). This means any operations 
 invoked from another thread may block until the poll() call completes.
 Some example use cases where this can be a problem:
 * A shutdown hook is registered to trigger shutdown and invokes close(). It 
 gets invoked from another thread and blocks indefinitely.
 * User wants to manage offset commit themselves in a background thread. If 
 the commit policy is not purely time based, it's not currently possibly to 
 make sure the call to commit() will be processed promptly.
 Two possible solutions to this:
 1. Make sure a lock is not held during the actual select call. Since we have 
 multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) 
 this is probably hard to make work cleanly since locking is currently only 
 performed at the KafkaConsumer level and we'd want it unlocked around a 
 single line of code in Selector.
 2. Wake up the selector before synchronizing for certain operations. This 
 would require some additional coordination to make sure the caller of 
 wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() 
 thread being woken up and then promptly reacquiring the lock with a 
 subsequent long poll() call).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-23 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 23, 2015, 4:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: [GitHub] kafka pull request: Kafka 2276

2015-06-23 Thread Geoffrey Anderson
Hi Gwen,

That is indeed the plan, and my understanding is that the merge script
Ismael has been working on helps committers with this step.

I'm trying out the Github flow roughly as outlined here:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201504.mbox/%3ccad5tkzab-hkey-zcr8x4wtxawybxpojx62k1vbv+ycknuxq...@mail.gmail.com%3E

Ismael's script is here: https://issues.apache.org/jira/browse/KAFKA-2187


Thanks,

Geoff

On Mon, Jun 22, 2015 at 9:59 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks, I indeed missed the original :)

 Is the plan to squash the commits and merge a pull request with single
 commit that matches the JIRA #?
 This will be more in line with how commits were organized until now
 and will make life much easier when cherry-picking.

 Gwen

 On Mon, Jun 22, 2015 at 1:58 PM, Geoffrey Anderson ge...@confluent.io
 wrote:
  Hi,
 
  I'm pinging the dev list regarding KAFKA-2276 (KIP-25 initial patch)
 again
  since it sounds like at least one person I spoke with did not see the
  initial pull request.
 
  Pull request: https://github.com/apache/kafka/pull/70/
  JIRA: https://issues.apache.org/jira/browse/KAFKA-2276
 
  Thanks!
  Geoff
 
 
  On Tue, Jun 16, 2015 at 2:50 PM, granders g...@git.apache.org wrote:
 
  GitHub user granders opened a pull request:
 
  https://github.com/apache/kafka/pull/70
 
  Kafka 2276
 
  Initial patch for KIP-25
 
  Note that to install ducktape, do *not* use pip to install ducktape.
  Instead:
 
  ```
  $ git clone g...@github.com:confluentinc/ducktape.git
  $ cd ducktape
  $ python setup.py install
  ```
 
 
  You can merge this pull request into a Git repository by running:
 
  $ git pull https://github.com/confluentinc/kafka KAFKA-2276
 
  Alternatively you can review and apply these changes as the patch at:
 
  https://github.com/apache/kafka/pull/70.patch
 
  To close this pull request, make a commit to your master/trunk branch
  with (at least) the following in the commit message:
 
  This closes #70
 
  
  commit 81e41562f3836e95e89e12f215c82b1b2d505381
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:32:54Z
 
  Bootstrap Kafka system tests
 
  commit f1914c3ba9b52d0f8db3989c8b031127b42ac59e
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:33:44Z
 
  Merge pull request #2 from confluentinc/system_tests
 
  Bootstrap Kafka system tests
 
  commit a2789885806f98dcd1fd58edc9a10a30e4bd314c
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:21:23Z
 
  fixed typos
 
  commit 07cd1c66a952ee29fc3c8e85464acb43a6981b8a
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:22:14Z
 
  Added simple producer which prints status of produced messages to
  stdout.
 
  commit da94b8cbe79e6634cc32fbe8f6deb25388923029
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T21:07:20Z
 
  Added number of messages option.
 
  commit 212b39a2d75027299fbb1b1008d463a82aab
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T22:35:06Z
 
  Added some metadata to producer output.
 
  commit 8b4b1f2aa9681632ef65aa92dfd3066cd7d62851
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-29T23:38:32Z
 
  Minor updates to VerboseProducer
 
  commit c0526fe44cea739519a0889ebe9ead01b406b365
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:27:15Z
 
  Updates per review comments.
 
  commit bc009f218e00241cbdd23931d01b52c442eef6b7
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:28:28Z
 
  Got rid of VerboseProducer in core (moved to clients)
 
  commit 475423bb642ac8f816e8080f891867a6362c17fa
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T04:05:09Z
 
  Convert class to string before adding to json object.
 
  commit 0a5de8e0590e3a8dce1a91769ad41497b5e07d17
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-02T22:46:52Z
 
  Fixed checkstyle errors. Changed name to VerifiableProducer. Added
  synchronization for thread safety on println statements.
 
  commit 9100417ce0717a71c822c5a279fe7858bfe7a7ee
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-03T19:50:11Z
 
  Updated command-line options for VerifiableProducer. Extracted
  throughput logic to make it reusable.
 
  commit 1228eefc4e52b58c214b3ad45feab36a475d5a66
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:09:14Z
 
  Renamed throttler
 
  commit 6842ed1ffad62a84df67a0f0b6a651a6df085d12
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:12:11Z
 
  left out a file from last commit
 
  commit d586fb0eb63409807c02f280fae786cec55fb348
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T01:22:34Z
 
  Updated comments to reflect that throttler is not message-specific
 
  commit a80a4282ba9a288edba7cdf409d31f01ebf3d458
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-04T20:47:21Z
 
   

[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2298:

Attachment: KAFKA-2298.patch

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2298:

Status: Patch Available  (was: Open)

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598024#comment-14598024
 ] 

Dong Lin commented on KAFKA-2298:
-

Created reviewboard https://reviews.apache.org/r/35791/diff/
 against branch origin/trunk

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/
---

Review request for kafka.


Bugs: KAFKA-2298
https://issues.apache.org/jira/browse/KAFKA-2298


Repository: kafka


Description
---

KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
without notifying NetworkClient


Diffs
-

  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
4aee214b24fd990be003adc36d675f015bf22fe6 

Diff: https://reviews.apache.org/r/35791/diff/


Testing
---


Thanks,

Dong Lin



high level consumer memory footprint

2015-06-23 Thread Kris K
Hi,

I was just wondering if there is any difference in the memory footprint of
a high level consumer when:

1. the consumer is live and continuously consuming messages with no backlogs
2. when the consumer is down for quite some time and needs to be brought up
to clear the backlog.

My test case with kafka 0.8.2.1 using only one topic has:

Setup: 6 brokers and 3 zookeeper nodes
Message Size: 1 MB
Producer rate: 100 threads with 1000 messages per thread
No. of partitions in topic: 100
Consumer threads: 100 consumer threads in the same group

I initially started producer and consumer on the same java process with a
heap size 1 GB. The producer could send all the messages to broker. But the
consumer started throwing OutOfMemory exceptions after consuming 26k
messages.

Upon restarting the process with 5 GB heap, the consumer consumed around
4.8k messages before going OOM (while clearing a backlog of around 74k).
The rest of the messages got consumed when I bumped up heap to 10 GB.

On the consumer, I have the default values for fetch.message.max.bytes and
queued.max.message.chunks.

If the calculation
(fetch.message.max.bytes)*(queued.max.message.chunks)*(no. of consumer
threads) holds good for consumer, then 1024*1024*10*100 (close to 1GB) is
well below the 5GB heap allocated. Did I leave something out of this
calculation?


Regards,
Kris


Re: [VOTE] KIP-23 - Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-06-23 Thread Neha Narkhede
+1

On Tue, Jun 23, 2015 at 11:15 AM, Ashish Singh asi...@cloudera.com wrote:

 Hey Guys,

 We had some discussion over mail and in KIP hangout. I will update the RB
 with proposed changes.


 On Sun, Jun 14, 2015 at 10:07 AM, Ashish Singh asi...@cloudera.com
 wrote:

  Hi Neha,
 
  Answers inline.
 
  On Thu, Jun 11, 2015 at 7:20 PM, Neha Narkhede n...@confluent.io
 wrote:
 
  Thanks for submitting the KIP, Ashish! Few questions.
 
  1. Can you specify more details around how you expect csv output to be
  used. Same for json.
 
  CSV takes less storage space and is more convenient for shell operations.
  A simple diff between two csv outputs would tell you if something changed
  or not. It's also common in certain industries when dealing with legacy
  systems and workflows. Try importing JSON into MS Excel.
 
  JSON on the other hand has easy interpretation, compact notation and
  supports Hierarchical Data. If someone is planning to run the tool
  periodically and send the output to some server or even just persist it
  somewhere, JSON is probably the way to go.
 
  2. If we add these options, would you still need the old format. If
  csv/json offers more convenience, should we have a plan to phase out the
  old format?
 
  Probably not, but having it around will not hurt. Having three output
  formats is not that bad and I do not expect this list to grow in future.
 
 
  On Thu, Jun 11, 2015 at 6:05 PM, Ashish Singh asi...@cloudera.com
  wrote:
 
   Jun,
  
   Can we add this as part of next KIP's agenda?
  
   On Thu, Jun 11, 2015 at 3:00 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Maybe bring it up at the next KIP call, to make sure everyone is
  aware?
   
On Thu, Jun 11, 2015 at 2:17 PM, Ashish Singh asi...@cloudera.com
   wrote:
 Hi Guys,

 This has been lying around for quite some time. Should I start a
  voting
 thread on this?

 On Thu, May 7, 2015 at 12:20 PM, Ashish Singh 
 asi...@cloudera.com
wrote:

 Had to change the title of the page and that surprisingly changed
  the
link
 as well. KIP-23 is now available at here
 
   
  
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=56852556
  
 .

 On Thu, May 7, 2015 at 11:34 AM, Ashish Singh 
 asi...@cloudera.com
  
wrote:

 Hi Guys,

 I just added a KIP, KIP-23 - Add JSON/CSV output and looping
  options
   to
 ConsumerGroupCommand
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-23, for
KAFKA-313
 https://issues.apache.org/jira/browse/KAFKA-313. The changes
  made
   as
 part of the JIRA can be found here 
https://reviews.apache.org/r/28096/.

 Comments and suggestions are welcome!

 --

 Regards,
 Ashish




 --

 Regards,
 Ashish




 --

 Regards,
 Ashish
   
  
  
  
   --
  
   Regards,
   Ashish
  
 
 
 
  --
  Thanks,
  Neha
 
 
 
 
  --
 
  Regards,
  Ashish
 



 --

 Regards,
 Ashish




-- 
Thanks,
Neha


RE: [VOTE] KIP-23 - Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-06-23 Thread Aditya Auradkar
Hey Ashish,

Last hangout, I think we discussed adding a num-iterations parameter to the 
script. Do you plan to support that?

Aditya


From: Neha Narkhede [n...@confluent.io]
Sent: Tuesday, June 23, 2015 11:24 AM
To: dev@kafka.apache.org
Cc: Jun Rao
Subject: Re: [VOTE] KIP-23 - Add JSON/CSV output and looping options to 
ConsumerGroupCommand

+1

On Tue, Jun 23, 2015 at 11:15 AM, Ashish Singh asi...@cloudera.com wrote:

 Hey Guys,

 We had some discussion over mail and in KIP hangout. I will update the RB
 with proposed changes.


 On Sun, Jun 14, 2015 at 10:07 AM, Ashish Singh asi...@cloudera.com
 wrote:

  Hi Neha,
 
  Answers inline.
 
  On Thu, Jun 11, 2015 at 7:20 PM, Neha Narkhede n...@confluent.io
 wrote:
 
  Thanks for submitting the KIP, Ashish! Few questions.
 
  1. Can you specify more details around how you expect csv output to be
  used. Same for json.
 
  CSV takes less storage space and is more convenient for shell operations.
  A simple diff between two csv outputs would tell you if something changed
  or not. It's also common in certain industries when dealing with legacy
  systems and workflows. Try importing JSON into MS Excel.
 
  JSON on the other hand has easy interpretation, compact notation and
  supports Hierarchical Data. If someone is planning to run the tool
  periodically and send the output to some server or even just persist it
  somewhere, JSON is probably the way to go.
 
  2. If we add these options, would you still need the old format. If
  csv/json offers more convenience, should we have a plan to phase out the
  old format?
 
  Probably not, but having it around will not hurt. Having three output
  formats is not that bad and I do not expect this list to grow in future.
 
 
  On Thu, Jun 11, 2015 at 6:05 PM, Ashish Singh asi...@cloudera.com
  wrote:
 
   Jun,
  
   Can we add this as part of next KIP's agenda?
  
   On Thu, Jun 11, 2015 at 3:00 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Maybe bring it up at the next KIP call, to make sure everyone is
  aware?
   
On Thu, Jun 11, 2015 at 2:17 PM, Ashish Singh asi...@cloudera.com
   wrote:
 Hi Guys,

 This has been lying around for quite some time. Should I start a
  voting
 thread on this?

 On Thu, May 7, 2015 at 12:20 PM, Ashish Singh 
 asi...@cloudera.com
wrote:

 Had to change the title of the page and that surprisingly changed
  the
link
 as well. KIP-23 is now available at here
 
   
  
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=56852556
  
 .

 On Thu, May 7, 2015 at 11:34 AM, Ashish Singh 
 asi...@cloudera.com
  
wrote:

 Hi Guys,

 I just added a KIP, KIP-23 - Add JSON/CSV output and looping
  options
   to
 ConsumerGroupCommand
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-23, for
KAFKA-313
 https://issues.apache.org/jira/browse/KAFKA-313. The changes
  made
   as
 part of the JIRA can be found here 
https://reviews.apache.org/r/28096/.

 Comments and suggestions are welcome!

 --

 Regards,
 Ashish




 --

 Regards,
 Ashish




 --

 Regards,
 Ashish
   
  
  
  
   --
  
   Regards,
   Ashish
  
 
 
 
  --
  Thanks,
  Neha
 
 
 
 
  --
 
  Regards,
  Ashish
 



 --

 Regards,
 Ashish




--
Thanks,
Neha


Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Dong Lin


 On June 23, 2015, 5:59 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  282
  https://reviews.apache.org/r/35791/diff/1/?file=990592#file990592line282
 
  Do you think we should just move the disconnected.add() into the close 
  method?

I have thought about it as well. But probabaly no. Because in Selector.send() 
we put failed destinationId is put in failedSends rather than disconnected. The 
reason we use failedSends is because send() and poll() in Selector will be 
called asynchronously by different threads.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---


On June 23, 2015, 5:41 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 23, 2015, 5:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




[VOTE] KIP-23 - Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-06-23 Thread Ashish Singh
Hey Guys,

We had some discussion over mail and in KIP hangout. I will update the RB
with proposed changes.


On Sun, Jun 14, 2015 at 10:07 AM, Ashish Singh asi...@cloudera.com wrote:

 Hi Neha,

 Answers inline.

 On Thu, Jun 11, 2015 at 7:20 PM, Neha Narkhede n...@confluent.io wrote:

 Thanks for submitting the KIP, Ashish! Few questions.

 1. Can you specify more details around how you expect csv output to be
 used. Same for json.

 CSV takes less storage space and is more convenient for shell operations.
 A simple diff between two csv outputs would tell you if something changed
 or not. It's also common in certain industries when dealing with legacy
 systems and workflows. Try importing JSON into MS Excel.

 JSON on the other hand has easy interpretation, compact notation and
 supports Hierarchical Data. If someone is planning to run the tool
 periodically and send the output to some server or even just persist it
 somewhere, JSON is probably the way to go.

 2. If we add these options, would you still need the old format. If
 csv/json offers more convenience, should we have a plan to phase out the
 old format?

 Probably not, but having it around will not hurt. Having three output
 formats is not that bad and I do not expect this list to grow in future.


 On Thu, Jun 11, 2015 at 6:05 PM, Ashish Singh asi...@cloudera.com
 wrote:

  Jun,
 
  Can we add this as part of next KIP's agenda?
 
  On Thu, Jun 11, 2015 at 3:00 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Maybe bring it up at the next KIP call, to make sure everyone is
 aware?
  
   On Thu, Jun 11, 2015 at 2:17 PM, Ashish Singh asi...@cloudera.com
  wrote:
Hi Guys,
   
This has been lying around for quite some time. Should I start a
 voting
thread on this?
   
On Thu, May 7, 2015 at 12:20 PM, Ashish Singh asi...@cloudera.com
   wrote:
   
Had to change the title of the page and that surprisingly changed
 the
   link
as well. KIP-23 is now available at here

  
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=56852556
 
.
   
On Thu, May 7, 2015 at 11:34 AM, Ashish Singh asi...@cloudera.com
 
   wrote:
   
Hi Guys,
   
I just added a KIP, KIP-23 - Add JSON/CSV output and looping
 options
  to
ConsumerGroupCommand
https://cwiki.apache.org/confluence/display/KAFKA/KIP-23, for
   KAFKA-313
https://issues.apache.org/jira/browse/KAFKA-313. The changes
 made
  as
part of the JIRA can be found here 
   https://reviews.apache.org/r/28096/.
   
Comments and suggestions are welcome!
   
--
   
Regards,
Ashish
   
   
   
   
--
   
Regards,
Ashish
   
   
   
   
--
   
Regards,
Ashish
  
 
 
 
  --
 
  Regards,
  Ashish
 



 --
 Thanks,
 Neha




 --

 Regards,
 Ashish




-- 

Regards,
Ashish


Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 282)
https://reviews.apache.org/r/35791/#comment141609

Do you think we should just move the disconnected.add() into the close 
method?


- Jason Gustafson


On June 23, 2015, 5:41 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 23, 2015, 5:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




[jira] [Resolved] (KAFKA-2294) javadoc compile error due to illegal p/ , build failing (jdk 8)

2015-06-23 Thread Jakob Homan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakob Homan resolved KAFKA-2294.

   Resolution: Fixed
Fix Version/s: 0.9.0
 Reviewer: Jakob Homan
 Assignee: Jeff Maxwell

I've committed this to trunk.  Resolving.  Thanks for the patch, Jeff!

 javadoc compile error due to illegal p/ , build failing (jdk 8)
 -

 Key: KAFKA-2294
 URL: https://issues.apache.org/jira/browse/KAFKA-2294
 Project: Kafka
  Issue Type: Bug
Reporter: Jeremy Fields
Assignee: Jeff Maxwell
 Fix For: 0.9.0

 Attachments: KAFKA-2294-1.patch


 Quick one,
 kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:525:
  error: self-closing element not allowed
  * p/
 This is causing build to fail under java 8 due to strict html checking.
 Replace that p/ with p
 Regards,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Jason Gustafson


 On June 23, 2015, 5:59 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  282
  https://reviews.apache.org/r/35791/diff/1/?file=990592#file990592line282
 
  Do you think we should just move the disconnected.add() into the close 
  method?
 
 Dong Lin wrote:
 I have thought about it as well. But probabaly no. Because in 
 Selector.send() we put failed destinationId is put in failedSends rather than 
 disconnected. The reason we use failedSends is because send() and poll() in 
 Selector will be called asynchronously by different threads.

Yeah, that makes sense. I wonder if we should be using a Set internally instead 
of a List? Then we wouldn't need to worry about adding to disconnected multiple 
times. Guess there might be some performance impact, but we would have an 
easier time ensuring correctness.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---


On June 23, 2015, 5:41 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 23, 2015, 5:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-06-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598297#comment-14598297
 ] 

Sriharsha Chintalapani commented on KAFKA-1690:
---

Updated reviewboard https://reviews.apache.org/r/33620/diff/
 against branch origin/trunk

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client

2015-06-23 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1690:
--
Attachment: KAFKA-1690_2015-06-23_13:18:20.patch

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, 
 KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, 
 KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, 
 KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch, 
 KAFKA-1690_2015-05-21_10:37:08.patch, KAFKA-1690_2015-06-03_18:52:29.patch, 
 KAFKA-1690_2015-06-23_13:18:20.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-06-23 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/
---

(Updated June 23, 2015, 8:18 p.m.)


Review request for kafka.


Bugs: KAFKA-1690
https://issues.apache.org/jira/browse/KAFKA-1690


Repository: kafka


Description (updated)
---

KAFKA-1690. new java producer needs ssl support as a client.


KAFKA-1690. new java producer needs ssl support as a client.


KAFKA-1690. new java producer needs ssl support as a client.


KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.


KAFKA-1690. new java producer needs ssl support as a client. Added 
PrincipalBuilder.


KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.


KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.


KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.


KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues 
with the patch.


KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues 
with the patch.


KAFKA-1690. new java producer needs ssl support as a client.


KAFKA-1690. new java producer needs ssl support as a client.


Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1


KAFKA-1690. Broker side ssl changes.


KAFKA-1684. SSL for socketServer.


KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.


Merge branch 'trunk' into KAFKA-1690-V1


KAFKA-1690. Post merge fixes.


KAFKA-1690. Added SSLProducerSendTest.


KAFKA-1690. Minor fixes based on patch review comments.


Diffs (updated)
-

  build.gradle 30d1cf2f1ff9ed3f86a060da8099bb0774b4cf91 
  checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
daff34db5bf2144e9dc274b23dc56b88f4efafdc 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
951c34c92710fc4b38d656e99d2a41255c60aeb7 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
5a37580ec69af08b97cf5b43b241790ba8c129dd 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
bae528d31516679bed88ee61b408f209f185a8cc 
  clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
  clients/src/main/java/org/apache/kafka/common/network/Channel.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
  
clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
618a0fa53848ae6befea7eba39c2f3285b734494 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
4aee214b24fd990be003adc36d675f015bf22fe6 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
8f6daadf6b67c3414911cda77765512131e56fd3 
  clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
dab1a94dd29563688b6ecf4eeb0e180b06049d3f 
  
clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
f73eedb030987f018d8446bb1dcd98d19fa97331 
  

Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Ewen Cheslack-Postava
There was some discussion on the KIP call today. I'll give my summary of
what I heard here to make sure this thread has the complete context for
ongoing discussion.

* Where the project should live, and if in Kafka, where should connectors
live? If some are in Kafka and some not, how many and which ones? - There
was little disagreement on the tradeoffs (coding and packaging together can
make things easier for end users especially for a few very popular
connectors, maintaining internally can lead to messier code base with more
dependencies that's harder to work with, etc). Seems to be more focus on
location of connectors than framework right now; we'll probably only make
progress on this issue with some concrete proposals.
* Organizational issues within Kafka - subproject? - Jay mentioned desire
for consistency, which can be a problem even across subprojects.
* Will streaming data be supported? - Yes, Streaming and batch section of
design goals should cover this; this is a very important use case.
* Additional transformations in copycat - Updated wiki to leave this a bit
more open. Original motivation for leaving it out was to keep the scope of
this KIP and the Copycat framework very clear since there is a danger in
overgeneralizing and ending up with a stream processing framework; however,
it's clear there are some very useful, very common examples like scrubbing
data during import.
* Schemas and how the data model works - this requires a more in depth
answer when we get to a complete proposal, but the prototype we've been
playing with internally uses something that can work with data roughly like
Avro or JSON, and supports schemas. The goal is for this data model to only
be used at runtime and for the serialization that is used for storing data
in Kafka to be pluggable. Each type of serialization plugin might handle
things like schemas in different ways. The reason we are proposing the
inclusion of schemas is that it lets you cleanly carry important info
across multiples stages, e.g. the schema for data pulled from a database is
defined by the table the data is read from, intermediate processing steps
might maintain schemas as well, and then an export to, e.g., a parquet file
in HDFS would also use the schema. There will definitely need to be
discussion about the details of this data model, what needs to be included
to make it work across multiple serialization formats, etc.
* Could mirror maker be implemented in Copycat? Same for Camus? - Yes, both
would make sense in Copycat. One of the motivations is to have fewer tools
required for a lot of these common tasks. Mirror maker is a case where we
could easily maintain the connector as part of Kafka, and we could probably
bootstrap one very quickly using lessons learned from mirror maker. The
experience with mirror maker is also an argument for making sure Kafka devs
are closely involved in Copycat development -- it's actually tricky to get
it right even when you know Kafka and Copycat has to get everything right
for more general cases.

I made minor updates to the wiki to reflect some of these notes. Anyone
else have any specific updates they think should be made to any of the
sections, especially considerations I may have omitted from the rejected
alternatives (or any rejected alternatives that they think still need to
be under consideration)?

Let me know what you think needs to be addressed to get this to a vote -- I
don't want to rush people, but I also don't want to just leave this
lingering unless there are specific issues that can be addressed.

-Ewen


On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing systemsŠ
 since all of existing ones do parallelize data movement.


 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


 YARN: My point isn't that YARN is bad, it's that tying to any particular
 cluster manager severely limits the applicability of the tool. The goal is
 to make Copycat agnostic to the cluster manager so it can run under Mesos,
 YARN, etc.

 ok. Got it. Sounds like there is plan to do some work here to ensure
 out-of-the-box it works with more than one scheduler (as @Jay listed out).
 In that case, IMO it would be better to actually rephrase it in the KIP
 that it will support more than one scheduler.


 Exactly once: You accomplish this in any system by managing offsets in the
 destination system atomically with the data or through some kind of
 deduplication. Jiangjie actually just gave a great talk about this issue
 at
 a recent Kafka meetup, perhaps he can share some slides about it. When you

Re: Review Request 33620: Patch for KAFKA-1690

2015-06-23 Thread Gwen Shapira


 On May 22, 2015, 12:33 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 231
  https://reviews.apache.org/r/33620/diff/8/?file=966794#file966794line231
 
  Do we need config.values? Could we just pass in config.originals() as 
  we do for the serializer?
 
 Sriharsha Chintalapani wrote:
 config.originals() contains the user configured properties only it 
 doesn't have defaults defined in ProducerConfig. On the side note shouldn't 
 we use config.values() everywhere?

Since I recently worked on KafkaConfig a bit, I can share my insights:

Basically, the idea in passing properties along to pluggable tools is that we 
can define properties in our configuration files that Kafka doesn't know about 
(i.e. that are not declared with ConfigDef). Those will be defined by users, so 
originals() will include them.

If a default exists, it means that Kafka knows about this configuration so you 
can pass the specific configuration you need along (in this case, 
SecurityProtocol).

Note that this is all for pluggables - serializers, reporters, authorizers - 
which will not depend on Kafka and therefore can't reuse AbstractConfiguration 
classes. Channels will be implemented as part of Kafka and therefore you can 
probably just pass an AbstractConfiguration and use that.

Hope this helps.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review84232
---


On June 23, 2015, 8:18 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated June 23, 2015, 8:18 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Diffs
 -
 
   build.gradle 30d1cf2f1ff9ed3f86a060da8099bb0774b4cf91 
   checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 daff34db5bf2144e9dc274b23dc56b88f4efafdc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 5a37580ec69af08b97cf5b43b241790ba8c129dd 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/Channel.java 
 PRE-CREATION 
   

Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Dong Lin


 On June 23, 2015, 5:59 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  282
  https://reviews.apache.org/r/35791/diff/1/?file=990592#file990592line282
 
  Do you think we should just move the disconnected.add() into the close 
  method?
 
 Dong Lin wrote:
 I have thought about it as well. But probabaly no. Because in 
 Selector.send() we put failed destinationId is put in failedSends rather than 
 disconnected. The reason we use failedSends is because send() and poll() in 
 Selector will be called asynchronously by different threads.
 
 Jason Gustafson wrote:
 Yeah, that makes sense. I wonder if we should be using a Set internally 
 instead of a List? Then we wouldn't need to worry about adding to 
 disconnected multiple times. Guess there might be some performance impact, 
 but we would have an easier time ensuring correctness.
 
 Dong Lin wrote:
 Yeah that can be useful. I am not sure if currently some id may be added 
 to disconnected multiple times. Even it does, this should be not a problme in 
 NetworkClient.handleDisconnections(). I personally prefer to keep existing 
 code unless there is good reason (i.e. performance or functionality) for 
 change. Not sure what others think about this change.
 
 Jason Gustafson wrote:
 Seems unnecessary now that I think about it. In fact, it wouldn't hurt to 
 do the disconnected.add() into close() as long as we also add it to 
 failedSends, but that might make the code tougher to follow. Anyway, I think 
 the patch is fine as it is. I was just wondering if there was an easy way to 
 prevent this issue from popping up again in the future.

Yeah I agree with you. I will add to disconnected in close() if I were to write 
this class/function from scratch. But since this is a small patch to existing 
module I will make the change that solves the problem with least dispute to 
make peer review smooth.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---


On June 23, 2015, 5:41 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 23, 2015, 5:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




Re: Review Request 33620: Patch for KAFKA-1690

2015-06-23 Thread Sriharsha Chintalapani


 On May 22, 2015, 12:33 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
   line 294
  https://reviews.apache.org/r/33620/diff/8/?file=966806#file966806line294
 
  Not sure why we are looping back here. If the HandshakeStatus is 
  NEED_UNWRAP, we loop back w/o reading more data from the socket, will we 
  get into infinite loop?

answered below


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review84232
---


On June 23, 2015, 8:18 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33620/
 ---
 
 (Updated June 23, 2015, 8:18 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1690
 https://issues.apache.org/jira/browse/KAFKA-1690
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Added 
 PrincipalBuilder.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Addressing 
 reviews.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
 issues with the patch.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 KAFKA-1690. new java producer needs ssl support as a client.
 
 
 Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Broker side ssl changes.
 
 
 KAFKA-1684. SSL for socketServer.
 
 
 KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
 
 
 Merge branch 'trunk' into KAFKA-1690-V1
 
 
 KAFKA-1690. Post merge fixes.
 
 
 KAFKA-1690. Added SSLProducerSendTest.
 
 
 KAFKA-1690. Minor fixes based on patch review comments.
 
 
 Diffs
 -
 
   build.gradle 30d1cf2f1ff9ed3f86a060da8099bb0774b4cf91 
   checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 48fe7961e2215372d8033ece4af739ea06c6457b 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 daff34db5bf2144e9dc274b23dc56b88f4efafdc 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 5a37580ec69af08b97cf5b43b241790ba8c129dd 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 aa264202f2724907924985a5ecbe74afc4c6c04b 
   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
 bae528d31516679bed88ee61b408f209f185a8cc 
   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
   clients/src/main/java/org/apache/kafka/common/network/Channel.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 618a0fa53848ae6befea7eba39c2f3285b734494 
   

[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2298:

Labels: quotas  (was: )

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1901) Move Kafka version to be generated in code by build (instead of in manifest)

2015-06-23 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14597558#comment-14597558
 ] 

Manikumar Reddy commented on KAFKA-1901:


Probably the easiest thing to do would be to generate a properties file 
(kafkaversion.props) with  version, git-hash info in it and put that in kafka 
jar as a resource. 

[~jbrosenb...@gmail.com] Do you this approach work for you? 

 Move Kafka version to be generated in code by build (instead of in manifest)
 

 Key: KAFKA-1901
 URL: https://issues.apache.org/jira/browse/KAFKA-1901
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Jason Rosenberg

 With 0.8.2 (rc2), I've started seeing this warning in the logs of apps 
 deployed to our staging (both server and client):
 {code}
 2015-01-23 00:55:25,273  WARN [async-message-sender-0] common.AppInfo$ - 
 Can't read Kafka version from MANIFEST.MF. Possible cause: 
 java.lang.NullPointerException
 {code}
 The issues is that in our deployment, apps are deployed with single 'shaded' 
 jars (e.g. using the maven shade plugin).  This means the MANIFEST.MF file 
 won't have a kafka version.  Instead, suggest the kafka build generate the 
 proper version in code, as part of the build.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1901) Move Kafka version to be generated in code by build (instead of in manifest)

2015-06-23 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14597558#comment-14597558
 ] 

Manikumar Reddy edited comment on KAFKA-1901 at 6/23/15 12:10 PM:
--

Probably the easiest thing to do would be to generate a properties file 
(kafkaversion.props) with  version, git-hash info in it and put that in kafka 
jar as a resource. 

[~jbrosenb...@gmail.com] Do you this approach works for you? 


was (Author: omkreddy):
Probably the easiest thing to do would be to generate a properties file 
(kafkaversion.props) with  version, git-hash info in it and put that in kafka 
jar as a resource. 

[~jbrosenb...@gmail.com] Do you this approach work for you? 

 Move Kafka version to be generated in code by build (instead of in manifest)
 

 Key: KAFKA-1901
 URL: https://issues.apache.org/jira/browse/KAFKA-1901
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Jason Rosenberg

 With 0.8.2 (rc2), I've started seeing this warning in the logs of apps 
 deployed to our staging (both server and client):
 {code}
 2015-01-23 00:55:25,273  WARN [async-message-sender-0] common.AppInfo$ - 
 Can't read Kafka version from MANIFEST.MF. Possible cause: 
 java.lang.NullPointerException
 {code}
 The issues is that in our deployment, apps are deployed with single 'shaded' 
 jars (e.g. using the maven shade plugin).  This means the MANIFEST.MF file 
 won't have a kafka version.  Instead, suggest the kafka build generate the 
 proper version in code, as part of the build.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1901) Move Kafka version to be generated in code by build (instead of in manifest)

2015-06-23 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14597558#comment-14597558
 ] 

Manikumar Reddy edited comment on KAFKA-1901 at 6/23/15 12:13 PM:
--

Probably the easiest thing to do would be to generate a properties file 
(kafkaversion.props) with  version, git-hash info in it and put that in kafka 
jar as a resource. 

[~jbrosenb...@gmail.com] Do you think this approach works for you? 


was (Author: omkreddy):
Probably the easiest thing to do would be to generate a properties file 
(kafkaversion.props) with  version, git-hash info in it and put that in kafka 
jar as a resource. 

[~jbrosenb...@gmail.com] Do you this approach works for you? 

 Move Kafka version to be generated in code by build (instead of in manifest)
 

 Key: KAFKA-1901
 URL: https://issues.apache.org/jira/browse/KAFKA-1901
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Jason Rosenberg

 With 0.8.2 (rc2), I've started seeing this warning in the logs of apps 
 deployed to our staging (both server and client):
 {code}
 2015-01-23 00:55:25,273  WARN [async-message-sender-0] common.AppInfo$ - 
 Can't read Kafka version from MANIFEST.MF. Possible cause: 
 java.lang.NullPointerException
 {code}
 The issues is that in our deployment, apps are deployed with single 'shaded' 
 jars (e.g. using the maven shade plugin).  This means the MANIFEST.MF file 
 won't have a kafka version.  Instead, suggest the kafka build generate the 
 proper version in code, as part of the build.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [GitHub] kafka pull request: Kafka 2276

2015-06-23 Thread Gwen Shapira
I'm unclear on the directory structure and few high level things (and
I can't figure out how to comment on more than a single line in
github):

tests/kafkatest - isn't it redundant? do we expect any non-kafka tests?

services/templates - those are basically all configuration files to be
used by the services, right? Do we expect a single set for the entire
system? or different templates for different tests? I'm checking
because systemtests had config files per test.

any thoughts on how multi-version tests will work? will we have
service per version?

kafkatest/tests - do we expect every test to be a single script? or
should we have subdirectories here, to start things in the right
direction? Maybe subdirectories for planned groups of tests?

kafka/vagrant - is this intentionally not under tests? Is this at all
stand-alone? If so, maybe a separate Jira? This has a bunch of stuff
that I'm not sure we want in the project at all... specific IPs,
Vagrant install from debian packages, etc.

Gwen









On Tue, Jun 23, 2015 at 10:45 AM, Gwen Shapira gshap...@cloudera.com wrote:
 Awesome, thanks :)

 On Tue, Jun 23, 2015 at 10:32 AM, Geoffrey Anderson ge...@confluent.io 
 wrote:
 Hi Gwen,

 That is indeed the plan, and my understanding is that the merge script
 Ismael has been working on helps committers with this step.

 I'm trying out the Github flow roughly as outlined here:
 http://mail-archives.apache.org/mod_mbox/kafka-dev/201504.mbox/%3ccad5tkzab-hkey-zcr8x4wtxawybxpojx62k1vbv+ycknuxq...@mail.gmail.com%3E

 Ismael's script is here: https://issues.apache.org/jira/browse/KAFKA-2187


 Thanks,

 Geoff

 On Mon, Jun 22, 2015 at 9:59 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks, I indeed missed the original :)

 Is the plan to squash the commits and merge a pull request with single
 commit that matches the JIRA #?
 This will be more in line with how commits were organized until now
 and will make life much easier when cherry-picking.

 Gwen

 On Mon, Jun 22, 2015 at 1:58 PM, Geoffrey Anderson ge...@confluent.io
 wrote:
  Hi,
 
  I'm pinging the dev list regarding KAFKA-2276 (KIP-25 initial patch)
 again
  since it sounds like at least one person I spoke with did not see the
  initial pull request.
 
  Pull request: https://github.com/apache/kafka/pull/70/
  JIRA: https://issues.apache.org/jira/browse/KAFKA-2276
 
  Thanks!
  Geoff
 
 
  On Tue, Jun 16, 2015 at 2:50 PM, granders g...@git.apache.org wrote:
 
  GitHub user granders opened a pull request:
 
  https://github.com/apache/kafka/pull/70
 
  Kafka 2276
 
  Initial patch for KIP-25
 
  Note that to install ducktape, do *not* use pip to install ducktape.
  Instead:
 
  ```
  $ git clone g...@github.com:confluentinc/ducktape.git
  $ cd ducktape
  $ python setup.py install
  ```
 
 
  You can merge this pull request into a Git repository by running:
 
  $ git pull https://github.com/confluentinc/kafka KAFKA-2276
 
  Alternatively you can review and apply these changes as the patch at:
 
  https://github.com/apache/kafka/pull/70.patch
 
  To close this pull request, make a commit to your master/trunk branch
  with (at least) the following in the commit message:
 
  This closes #70
 
  
  commit 81e41562f3836e95e89e12f215c82b1b2d505381
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:32:54Z
 
  Bootstrap Kafka system tests
 
  commit f1914c3ba9b52d0f8db3989c8b031127b42ac59e
  Author: Liquan Pei liquan...@gmail.com
  Date:   2015-04-24T01:33:44Z
 
  Merge pull request #2 from confluentinc/system_tests
 
  Bootstrap Kafka system tests
 
  commit a2789885806f98dcd1fd58edc9a10a30e4bd314c
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:21:23Z
 
  fixed typos
 
  commit 07cd1c66a952ee29fc3c8e85464acb43a6981b8a
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-26T22:22:14Z
 
  Added simple producer which prints status of produced messages to
  stdout.
 
  commit da94b8cbe79e6634cc32fbe8f6deb25388923029
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T21:07:20Z
 
  Added number of messages option.
 
  commit 212b39a2d75027299fbb1b1008d463a82aab
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-27T22:35:06Z
 
  Added some metadata to producer output.
 
  commit 8b4b1f2aa9681632ef65aa92dfd3066cd7d62851
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-05-29T23:38:32Z
 
  Minor updates to VerboseProducer
 
  commit c0526fe44cea739519a0889ebe9ead01b406b365
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:27:15Z
 
  Updates per review comments.
 
  commit bc009f218e00241cbdd23931d01b52c442eef6b7
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T02:28:28Z
 
  Got rid of VerboseProducer in core (moved to clients)
 
  commit 475423bb642ac8f816e8080f891867a6362c17fa
  Author: Geoff Anderson ge...@confluent.io
  Date:   2015-06-01T04:05:09Z
 
  

Review Request 35820: Patch for KAFKA-1367

2015-06-23 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35820/
---

Review request for kafka.


Bugs: KAFKA-1367
https://issues.apache.org/jira/browse/KAFKA-1367


Repository: kafka


Description
---

KAFKA-1367: Broker topic metadata not kept in sync with ZooKeeper


Diffs
-

  core/src/main/scala/kafka/common/TopicAndPartition.scala 
df3db912f5daef6a25b4b2dd2220d2cc3795bce6 
  core/src/main/scala/kafka/controller/KafkaController.scala 
36350579b16027359d237b64699003358704ac6f 
  core/src/main/scala/kafka/utils/ReplicationUtils.scala 
60687332b4c9bee4d4c0851314cfb4b02d5d3489 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
78475e3d5ec477cef00caeaa34ff2d196466be96 
  core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
c96c0ffd958d63c09880d436b2e5ae96f51ead36 

Diff: https://reviews.apache.org/r/35820/diff/


Testing
---

Tested


Thanks,

Ashish Singh



[jira] [Updated] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2015-06-23 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-1367:
--
Attachment: KAFKA-1367.patch

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
Assignee: Ashish K Singh
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1367.patch, KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 35820: Patch for KAFKA-1367

2015-06-23 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35820/
---

(Updated June 24, 2015, 5:10 a.m.)


Review request for kafka.


Bugs: KAFKA-1367
https://issues.apache.org/jira/browse/KAFKA-1367


Repository: kafka


Description
---

KAFKA-1367: Broker topic metadata not kept in sync with ZooKeeper


Diffs
-

  core/src/main/scala/kafka/common/TopicAndPartition.scala 
df3db912f5daef6a25b4b2dd2220d2cc3795bce6 
  core/src/main/scala/kafka/controller/KafkaController.scala 
36350579b16027359d237b64699003358704ac6f 
  core/src/main/scala/kafka/utils/ReplicationUtils.scala 
60687332b4c9bee4d4c0851314cfb4b02d5d3489 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
78475e3d5ec477cef00caeaa34ff2d196466be96 
  core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
c96c0ffd958d63c09880d436b2e5ae96f51ead36 

Diff: https://reviews.apache.org/r/35820/diff/


Testing (updated)
---

Tested on a test cluster with 3 Kafka brokers


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2015-06-23 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598880#comment-14598880
 ] 

Ashish K Singh commented on KAFKA-1367:
---

[~junrao], [~jjkoshy], [~nehanarkhede], [~gwenshap] just uploaded a patch to 
fix this. Apart from the changes suggested above, I just had to update 
controller's leader and isr cache before sending update metadata request. 
Tested it on a 3 node kafka cluster and the patch resolves the issue.

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
Assignee: Ashish K Singh
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1367.patch, KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2015-06-23 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-1367:
--
Status: Patch Available  (was: Open)

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
Assignee: Ashish K Singh
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1367.patch, KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2015-06-23 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598877#comment-14598877
 ] 

Ashish K Singh commented on KAFKA-1367:
---

Created reviewboard https://reviews.apache.org/r/35820/
 against branch trunk

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
Assignee: Ashish K Singh
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1367.patch, KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2296) Not able to delete topic on latest kafka

2015-06-23 Thread Andrew M (JIRA)
Andrew M created KAFKA-2296:
---

 Summary: Not able to delete topic on latest kafka
 Key: KAFKA-2296
 URL: https://issues.apache.org/jira/browse/KAFKA-2296
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Andrew M


Was able to reproduce [inability to delete 
topic|https://issues.apache.org/jira/browse/KAFKA-1397?focusedCommentId=14491442page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14491442]
 on running cluster with kafka 0.8.2.1.
Cluster consist from 2 c3.xlarge aws instances with sufficient storage 
attached. All communication between nodes goes through aws vpc

Some warns from logs:
{noformat}[Controller-1234-to-broker-4321-send-thread], Controller 1234 epoch 
20 fails to send request 
Name:UpdateMetadataRequest;Version:0;Controller:1234;ControllerEpoch:20;CorrelationId:24047;ClientId:id_1234-host_1.2.3.4-port_6667;AliveBrokers:id:1234,host:1.2.3.4,port:6667,id:4321,host:4.3.2.1,port:6667;PartitionState:[topic_name,45]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,27]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,17]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,49]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,7]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,26]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,62]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,18]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,36]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,29]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,53]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,52]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,2]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,12]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,33]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,14]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,63]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,30]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,6]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,28]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,38]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,24]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,31]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,4]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,20]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,54]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:1234,4321,LeaderEpoch:0,ControllerEpoch:20),ReplicationFactor:2),AllReplicas:4321,1234),[topic_name,11]
 - 
(LeaderAndIsrInfo:(Leader:-2,ISR:4321,1234,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:1234,4321),[topic_name,40]
 - 

[jira] [Created] (KAFKA-2295) Dynamically loaded classes (encoders, etc.) may not be found by Kafka Producer

2015-06-23 Thread Tathagata Das (JIRA)
Tathagata Das created KAFKA-2295:


 Summary: Dynamically loaded classes (encoders, etc.) may not be 
found by Kafka Producer 
 Key: KAFKA-2295
 URL: https://issues.apache.org/jira/browse/KAFKA-2295
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Reporter: Tathagata Das
Assignee: Jun Rao


Kafka Producer (via CoreUtils.createObject) effectively uses Class.forName to 
load encoder classes. Class.forName is by design finds classes only in the 
defining classloader of the enclosing class (which is often the bootstrap class 
loader). It does not use the current thread context class loader. This can lead 
to problems in environments where classes are dynamically loaded and therefore 
may not be present in the bootstrap classloader.

This leads to ClassNotFound Exceptions in environments like Spark where classes 
are loaded dynamically using custom classloaders. Issues like this have 
reported. E.g. - 
https://www.mail-archive.com/user@spark.apache.org/msg30951.html

Other references regarding this issue with Class.forName 
http://stackoverflow.com/questions/21749741/though-my-class-was-loaded-class-forname-throws-classnotfoundexception

This is a problem we have faced repeatedly in Apache Spark and we solved it by 
explicitly specifying the class loader to use. See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L178






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598719#comment-14598719
 ] 

Dong Lin commented on KAFKA-2298:
-

Updated reviewboard https://reviews.apache.org/r/35791/diff/
 against branch origin/trunk

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch, KAFKA-2298_2015-06-23_18:47:54.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-06-23 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2298:

Attachment: KAFKA-2298_2015-06-23_18:47:54.patch

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch, KAFKA-2298_2015-06-23_18:47:54.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/
---

(Updated June 24, 2015, 1:48 a.m.)


Review request for kafka.


Bugs: KAFKA-2298
https://issues.apache.org/jira/browse/KAFKA-2298


Repository: kafka


Description (updated)
---

KAFKA-2298; Client Selector can drop connections on InvalidReceiveException 
without notifying NetworkClient


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
4aee214b24fd990be003adc36d675f015bf22fe6 

Diff: https://reviews.apache.org/r/35791/diff/


Testing
---


Thanks,

Dong Lin



Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Dong Lin


 On June 23, 2015, 5:59 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  282
  https://reviews.apache.org/r/35791/diff/1/?file=990592#file990592line282
 
  Do you think we should just move the disconnected.add() into the close 
  method?
 
 Dong Lin wrote:
 I have thought about it as well. But probabaly no. Because in 
 Selector.send() we put failed destinationId is put in failedSends rather than 
 disconnected. The reason we use failedSends is because send() and poll() in 
 Selector will be called asynchronously by different threads.
 
 Jason Gustafson wrote:
 Yeah, that makes sense. I wonder if we should be using a Set internally 
 instead of a List? Then we wouldn't need to worry about adding to 
 disconnected multiple times. Guess there might be some performance impact, 
 but we would have an easier time ensuring correctness.
 
 Dong Lin wrote:
 Yeah that can be useful. I am not sure if currently some id may be added 
 to disconnected multiple times. Even it does, this should be not a problme in 
 NetworkClient.handleDisconnections(). I personally prefer to keep existing 
 code unless there is good reason (i.e. performance or functionality) for 
 change. Not sure what others think about this change.
 
 Jason Gustafson wrote:
 Seems unnecessary now that I think about it. In fact, it wouldn't hurt to 
 do the disconnected.add() into close() as long as we also add it to 
 failedSends, but that might make the code tougher to follow. Anyway, I think 
 the patch is fine as it is. I was just wondering if there was an easy way to 
 prevent this issue from popping up again in the future.
 
 Dong Lin wrote:
 Yeah I agree with you. I will add to disconnected in close() if I were to 
 write this class/function from scratch. But since this is a small patch to 
 existing module I will make the change that solves the problem with least 
 dispute to make peer review smooth.
 
 Joel Koshy wrote:
 Hi Dong - I'm not sure about your comment on multiple threads calling 
 send/poll. NetworkClient and Selector are both documented as being 
 `@nonthreadsafe` so that is probably a non-issue. The producer's sender for 
 e.g., does send/poll in the same (sender) thread .

Hi Joel, Jason,

Yeah you are right. Thanks for your correction. I have moved disconnected.add() 
into close() in the updated patch. Please have a look.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---


On June 24, 2015, 1:48 a.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 24, 2015, 1:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2298; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




trying to manage offset for ConsumerConnector

2015-06-23 Thread Shashank Singh
Hi Team

I am trying to create a Kafka Consumer API which is high level as
ConsumerConnector but is enable to manage offset to track the message
read/consumed and save from duplicate message consumption.

In addition to few other pages I was going through
http://www.slideshare.net/jjkoshy/offset-management-in-kafka . It seems
that the API that is mentioned to get and set offset is available from 8.3
release. Am I correct in my understanding? If yes is there a workaround to
manage offset not from zookeeper rather from kafka topic where it maintains
such info.

I was trying to use OffsetFetchRequest/Response and FetchRequest/Response
API however the methods that need to be called for fetching data and
setting and getting offset is not clear to me.

Can you please guide me to the correct link and API.


-- 

*Warm Regards,*

*Shashank  *

*Mobile: +91 9910478553 *

*Linkedin: in.linkedin.com/pub/shashank-singh/13/763/906/
http://in.linkedin.com/pub/shashank-singh/13/763/906/*


[jira] [Commented] (KAFKA-1215) Rack-Aware replica assignment option

2015-06-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598643#comment-14598643
 ] 

Jay Kreps commented on KAFKA-1215:
--

This is great!

 Rack-Aware replica assignment option
 

 Key: KAFKA-1215
 URL: https://issues.apache.org/jira/browse/KAFKA-1215
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0
Reporter: Joris Van Remoortere
Assignee: Jun Rao
 Fix For: 0.9.0

 Attachments: rack_aware_replica_assignment_v1.patch, 
 rack_aware_replica_assignment_v2.patch


 Adding a rack-id to kafka config. This rack-id can be used during replica 
 assignment by using the max-rack-replication argument in the admin scripts 
 (create topic, etc.). By default the original replication assignment 
 algorithm is used because max-rack-replication defaults to -1. 
 max-rack-replication  -1 is not honored if you are doing manual replica 
 assignment (preffered).
 If this looks good I can add some test cases specific to the rack-aware 
 assignment.
 I can also port this to trunk. We are currently running 0.8.0 in production 
 and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Ewen Cheslack-Postava
On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing systemsŠ
 since all of existing ones do parallelize data movement.


I'm guessing some confusion here might also be because we want both
parallelization and distribution.

Roughly speaking, I think of Copycat making the consumer group abstraction
available for any import task, and the idea is to make this automatic and
transparent to the user. This isn't interesting for systems that literally
only have a single input stream, but Copycat source connectors have a
built-in notion of parallel input streams. The connector's job is to inform
the the Copycat framework of what input streams there are and Copycat
handles running tasks, balancing the streams across them, handles failures
by rebalancing as necessary, provides offset commit and storage so tasks
can resume from the last known-good state, etc.

On the sink side, the input is the Kafka consumer group, which obviously
already has this parallelism built in. Depending on the output, this may
manifest in different ways. For HDFS, the effect is just that your output
files are partitioned (one per topic-partition).

As for other systems, can you be more specific? Some of them obviously do
(e.g. Camus), but others require you to handle this manually. I don't want
to pick on Flume specifically, but as an example, it requires either
configuring multiple (or multiplexed) flows in a single agent or manage
multiple agents independently. This isn't really the same as what I've
described above where you hand Copycat one config and it automatically
spreads the work across multiple, fault-tolerant tasks. But flume is also
targeting a much different general problem, trying to build potentially
large, multi-stage data flows with all sorts of transformations, filtering,
etc.




 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


This part of the discussion gets a little bit tricky, not least because it
seems people can't agree on exactly what these terms mean.

First, some systems that you didn't mention. Logstash definitely doesn't
have any guarantees as it uses a simple 20-event in-memory buffer between
stages. As far as I can tell, Heka doesn't provide these semantics either,
although I have not investigated it as deeply.

fluentd has an article discussing the options for it (
http://docs.fluentd.org/articles/high-availability), but I actually think
the article on writing plugins is more informative
http://docs.fluentd.org/articles/plugin-development The most important
point is that input plugins have no way to track or discovery downstream
delivery (i.e. they cannot get acks, nor is there any sort of offset
tracked that it can lookup to discover where to restart upon failure, nor
is it guaranteed that after router.emit() returns that the data will have
already been delivered downstream). So if I have a replicated input data
store, e.g. a replicated database, and I am just reading off it's
changelog, does fluentd actually guarantee something like at least once
delivery to the sink? In fact, fluentd's own documentation (the high
availability doc) describes data loss scenarios that aren't inherent to
every system (e.g., if their log aggregator dies, which not every system is
susceptible to, vs. if an event is generated on a single host and that host
dies before reporting it anywhere, then of course the data is permanently
lost).

Flume actually does have a (somewhat confusingly named) transaction concept
to help control this. The reliability actually depends on what type of
channel implementation you use. Gwen and Jeff from Cloudera integrated
Kafka and Flume, including a Kafka channel (see
http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/).
This does allow for better control over delivery semantics, and I think if
you use something like Kafka for every channel in your pipeline, you can
get something like what Copycat can provide. I'd argue flume's approach has
some other drawbacks though. In order to work correctly, every source and
sink has to handle the transaction semantics, which adds complexity
(although they do offer great skeleton examples in their docs!).

Copycat tries to avoid that complexity for connector developers by changing
the framework to use streams, offsets, and commits, and pushing the
complexities of dealing with any sorts of errors/failures into the
framework. Ideally connector developers only need to a) check for offsets
at startup and rewind to the last known committed offset and b) load events
from the source 

Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Ewen Cheslack-Postava
And, one more piece of follow up. Some folks were wondering about more
specific details about what we had in mind for the framework. Along with a
prototype I had been writing up some documentation. This isn't meant in any
way to be finalized and I just wrote it up using the same tools we use
internally rather than integrating it directly with Kafka docs like we'd
want to eventually do, but I think the current version would help clarify
some of the details of what we think the framework should look like without
prematurely getting too far into the specifics of the API and
implementation.

You can find a draft of these docs here:
https://s3-us-west-2.amazonaws.com/confluent-files/copycat-docs-wip/intro.html

-Ewen

On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:



 On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com
 wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing systemsŠ
 since all of existing ones do parallelize data movement.


 I'm guessing some confusion here might also be because we want both
 parallelization and distribution.

 Roughly speaking, I think of Copycat making the consumer group abstraction
 available for any import task, and the idea is to make this automatic and
 transparent to the user. This isn't interesting for systems that literally
 only have a single input stream, but Copycat source connectors have a
 built-in notion of parallel input streams. The connector's job is to inform
 the the Copycat framework of what input streams there are and Copycat
 handles running tasks, balancing the streams across them, handles failures
 by rebalancing as necessary, provides offset commit and storage so tasks
 can resume from the last known-good state, etc.

 On the sink side, the input is the Kafka consumer group, which obviously
 already has this parallelism built in. Depending on the output, this may
 manifest in different ways. For HDFS, the effect is just that your output
 files are partitioned (one per topic-partition).

 As for other systems, can you be more specific? Some of them obviously do
 (e.g. Camus), but others require you to handle this manually. I don't want
 to pick on Flume specifically, but as an example, it requires either
 configuring multiple (or multiplexed) flows in a single agent or manage
 multiple agents independently. This isn't really the same as what I've
 described above where you hand Copycat one config and it automatically
 spreads the work across multiple, fault-tolerant tasks. But flume is also
 targeting a much different general problem, trying to build potentially
 large, multi-stage data flows with all sorts of transformations, filtering,
 etc.




 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


 This part of the discussion gets a little bit tricky, not least because it
 seems people can't agree on exactly what these terms mean.

 First, some systems that you didn't mention. Logstash definitely doesn't
 have any guarantees as it uses a simple 20-event in-memory buffer between
 stages. As far as I can tell, Heka doesn't provide these semantics either,
 although I have not investigated it as deeply.

 fluentd has an article discussing the options for it (
 http://docs.fluentd.org/articles/high-availability), but I actually think
 the article on writing plugins is more informative
 http://docs.fluentd.org/articles/plugin-development The most important
 point is that input plugins have no way to track or discovery downstream
 delivery (i.e. they cannot get acks, nor is there any sort of offset
 tracked that it can lookup to discover where to restart upon failure, nor
 is it guaranteed that after router.emit() returns that the data will have
 already been delivered downstream). So if I have a replicated input data
 store, e.g. a replicated database, and I am just reading off it's
 changelog, does fluentd actually guarantee something like at least once
 delivery to the sink? In fact, fluentd's own documentation (the high
 availability doc) describes data loss scenarios that aren't inherent to
 every system (e.g., if their log aggregator dies, which not every system is
 susceptible to, vs. if an event is generated on a single host and that host
 dies before reporting it anywhere, then of course the data is permanently
 lost).

 Flume actually does have a (somewhat confusingly named) transaction
 concept to help control this. The reliability actually depends on what type
 of channel implementation you use. Gwen and Jeff from Cloudera integrated
 Kafka and Flume, including a Kafka channel (see
 

Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Sriram Subramanian
I am still not convinced why a stream processing framework closely tied to
Kafka will not help with this (since we are also referring to some basic
transformations). The devil is in the details of the design and I would be
able to better comment on it after that. I would love to see a detailed
design doc on the internals!

On 6/23/15 2:59 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

There was some discussion on the KIP call today. I'll give my summary of
what I heard here to make sure this thread has the complete context for
ongoing discussion.

* Where the project should live, and if in Kafka, where should connectors
live? If some are in Kafka and some not, how many and which ones? - There
was little disagreement on the tradeoffs (coding and packaging together
can
make things easier for end users especially for a few very popular
connectors, maintaining internally can lead to messier code base with more
dependencies that's harder to work with, etc). Seems to be more focus on
location of connectors than framework right now; we'll probably only make
progress on this issue with some concrete proposals.
* Organizational issues within Kafka - subproject? - Jay mentioned desire
for consistency, which can be a problem even across subprojects.
* Will streaming data be supported? - Yes, Streaming and batch section
of
design goals should cover this; this is a very important use case.
* Additional transformations in copycat - Updated wiki to leave this a bit
more open. Original motivation for leaving it out was to keep the scope of
this KIP and the Copycat framework very clear since there is a danger in
overgeneralizing and ending up with a stream processing framework;
however,
it's clear there are some very useful, very common examples like scrubbing
data during import.
* Schemas and how the data model works - this requires a more in depth
answer when we get to a complete proposal, but the prototype we've been
playing with internally uses something that can work with data roughly
like
Avro or JSON, and supports schemas. The goal is for this data model to
only
be used at runtime and for the serialization that is used for storing data
in Kafka to be pluggable. Each type of serialization plugin might handle
things like schemas in different ways. The reason we are proposing the
inclusion of schemas is that it lets you cleanly carry important info
across multiples stages, e.g. the schema for data pulled from a database
is
defined by the table the data is read from, intermediate processing steps
might maintain schemas as well, and then an export to, e.g., a parquet
file
in HDFS would also use the schema. There will definitely need to be
discussion about the details of this data model, what needs to be included
to make it work across multiple serialization formats, etc.
* Could mirror maker be implemented in Copycat? Same for Camus? - Yes,
both
would make sense in Copycat. One of the motivations is to have fewer tools
required for a lot of these common tasks. Mirror maker is a case where we
could easily maintain the connector as part of Kafka, and we could
probably
bootstrap one very quickly using lessons learned from mirror maker. The
experience with mirror maker is also an argument for making sure Kafka
devs
are closely involved in Copycat development -- it's actually tricky to get
it right even when you know Kafka and Copycat has to get everything right
for more general cases.

I made minor updates to the wiki to reflect some of these notes. Anyone
else have any specific updates they think should be made to any of the
sections, especially considerations I may have omitted from the rejected
alternatives (or any rejected alternatives that they think still need
to
be under consideration)?

Let me know what you think needs to be addressed to get this to a vote --
I
don't want to rush people, but I also don't want to just leave this
lingering unless there are specific issues that can be addressed.

-Ewen


On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com
wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing
systemsŠ
 since all of existing ones do parallelize data movement.


 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


 YARN: My point isn't that YARN is bad, it's that tying to any
particular
 cluster manager severely limits the applicability of the tool. The
goal is
 to make Copycat agnostic to the cluster manager so it can run under
Mesos,
 YARN, etc.

 ok. Got it. Sounds like there is plan to do some work here to ensure
 out-of-the-box it works with more than one scheduler (as @Jay listed
out).
 In that case, IMO it would 

Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Gwen Shapira
Re: Flume vs. CopyCat

I would love to have an automagically-parallelizing, schema-aware
version of Flume with great reliability guarantees. Flume has good
core architecture and I'm sure that if the Flume community is
interested, it can be extended in that direction.

However, the Apache way is not to stop new innovation just because
some systems already exists. We develop the best systems we can, and
users choose the ones they prefer - thats how ecosystems thrive.
If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)

Gwen



On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing systemsŠ
 since all of existing ones do parallelize data movement.


 I'm guessing some confusion here might also be because we want both
 parallelization and distribution.

 Roughly speaking, I think of Copycat making the consumer group abstraction
 available for any import task, and the idea is to make this automatic and
 transparent to the user. This isn't interesting for systems that literally
 only have a single input stream, but Copycat source connectors have a
 built-in notion of parallel input streams. The connector's job is to inform
 the the Copycat framework of what input streams there are and Copycat
 handles running tasks, balancing the streams across them, handles failures
 by rebalancing as necessary, provides offset commit and storage so tasks
 can resume from the last known-good state, etc.

 On the sink side, the input is the Kafka consumer group, which obviously
 already has this parallelism built in. Depending on the output, this may
 manifest in different ways. For HDFS, the effect is just that your output
 files are partitioned (one per topic-partition).

 As for other systems, can you be more specific? Some of them obviously do
 (e.g. Camus), but others require you to handle this manually. I don't want
 to pick on Flume specifically, but as an example, it requires either
 configuring multiple (or multiplexed) flows in a single agent or manage
 multiple agents independently. This isn't really the same as what I've
 described above where you hand Copycat one config and it automatically
 spreads the work across multiple, fault-tolerant tasks. But flume is also
 targeting a much different general problem, trying to build potentially
 large, multi-stage data flows with all sorts of transformations, filtering,
 etc.




 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


 This part of the discussion gets a little bit tricky, not least because it
 seems people can't agree on exactly what these terms mean.

 First, some systems that you didn't mention. Logstash definitely doesn't
 have any guarantees as it uses a simple 20-event in-memory buffer between
 stages. As far as I can tell, Heka doesn't provide these semantics either,
 although I have not investigated it as deeply.

 fluentd has an article discussing the options for it (
 http://docs.fluentd.org/articles/high-availability), but I actually think
 the article on writing plugins is more informative
 http://docs.fluentd.org/articles/plugin-development The most important
 point is that input plugins have no way to track or discovery downstream
 delivery (i.e. they cannot get acks, nor is there any sort of offset
 tracked that it can lookup to discover where to restart upon failure, nor
 is it guaranteed that after router.emit() returns that the data will have
 already been delivered downstream). So if I have a replicated input data
 store, e.g. a replicated database, and I am just reading off it's
 changelog, does fluentd actually guarantee something like at least once
 delivery to the sink? In fact, fluentd's own documentation (the high
 availability doc) describes data loss scenarios that aren't inherent to
 every system (e.g., if their log aggregator dies, which not every system is
 susceptible to, vs. if an event is generated on a single host and that host
 dies before reporting it anywhere, then of course the data is permanently
 lost).

 Flume actually does have a (somewhat confusingly named) transaction concept
 to help control this. The reliability actually depends on what type of
 channel implementation you use. Gwen and Jeff from Cloudera integrated
 Kafka and Flume, including a Kafka channel (see
 http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/).
 This does allow for better control over delivery semantics, and 

Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export

2015-06-23 Thread Gwen Shapira
One more reason to have CopyCat as a separate project is to sidestep
the entire Why CopyCat and not X discussion :)

On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira gshap...@cloudera.com wrote:
 Re: Flume vs. CopyCat

 I would love to have an automagically-parallelizing, schema-aware
 version of Flume with great reliability guarantees. Flume has good
 core architecture and I'm sure that if the Flume community is
 interested, it can be extended in that direction.

 However, the Apache way is not to stop new innovation just because
 some systems already exists. We develop the best systems we can, and
 users choose the ones they prefer - thats how ecosystems thrive.
 If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
 Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)

 Gwen



 On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
 e...@confluent.io wrote:
 On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik ros...@hortonworks.com wrote:

 Thanks Jay and Ewen for the response.


 @Jay
 
  3. This has a built in notion of parallelism throughout.



 It was not obvious how it will look like or differ from existing systemsŠ
 since all of existing ones do parallelize data movement.


 I'm guessing some confusion here might also be because we want both
 parallelization and distribution.

 Roughly speaking, I think of Copycat making the consumer group abstraction
 available for any import task, and the idea is to make this automatic and
 transparent to the user. This isn't interesting for systems that literally
 only have a single input stream, but Copycat source connectors have a
 built-in notion of parallel input streams. The connector's job is to inform
 the the Copycat framework of what input streams there are and Copycat
 handles running tasks, balancing the streams across them, handles failures
 by rebalancing as necessary, provides offset commit and storage so tasks
 can resume from the last known-good state, etc.

 On the sink side, the input is the Kafka consumer group, which obviously
 already has this parallelism built in. Depending on the output, this may
 manifest in different ways. For HDFS, the effect is just that your output
 files are partitioned (one per topic-partition).

 As for other systems, can you be more specific? Some of them obviously do
 (e.g. Camus), but others require you to handle this manually. I don't want
 to pick on Flume specifically, but as an example, it requires either
 configuring multiple (or multiplexed) flows in a single agent or manage
 multiple agents independently. This isn't really the same as what I've
 described above where you hand Copycat one config and it automatically
 spreads the work across multiple, fault-tolerant tasks. But flume is also
 targeting a much different general problem, trying to build potentially
 large, multi-stage data flows with all sorts of transformations, filtering,
 etc.




 @Ewen,

 Import: Flume is just one of many similar systems designed around log
 collection. See notes below, but one major point is that they generally
 don't provide any sort of guaranteed delivery semantics.


 I think most of them do provide guarantees of some sort (Ex. Flume 
 FluentD).


 This part of the discussion gets a little bit tricky, not least because it
 seems people can't agree on exactly what these terms mean.

 First, some systems that you didn't mention. Logstash definitely doesn't
 have any guarantees as it uses a simple 20-event in-memory buffer between
 stages. As far as I can tell, Heka doesn't provide these semantics either,
 although I have not investigated it as deeply.

 fluentd has an article discussing the options for it (
 http://docs.fluentd.org/articles/high-availability), but I actually think
 the article on writing plugins is more informative
 http://docs.fluentd.org/articles/plugin-development The most important
 point is that input plugins have no way to track or discovery downstream
 delivery (i.e. they cannot get acks, nor is there any sort of offset
 tracked that it can lookup to discover where to restart upon failure, nor
 is it guaranteed that after router.emit() returns that the data will have
 already been delivered downstream). So if I have a replicated input data
 store, e.g. a replicated database, and I am just reading off it's
 changelog, does fluentd actually guarantee something like at least once
 delivery to the sink? In fact, fluentd's own documentation (the high
 availability doc) describes data loss scenarios that aren't inherent to
 every system (e.g., if their log aggregator dies, which not every system is
 susceptible to, vs. if an event is generated on a single host and that host
 dies before reporting it anywhere, then of course the data is permanently
 lost).

 Flume actually does have a (somewhat confusingly named) transaction concept
 to help control this. The reliability actually depends on what type of
 channel implementation you use. Gwen and Jeff from Cloudera integrated
 Kafka and 

[jira] [Commented] (KAFKA-1215) Rack-Aware replica assignment option

2015-06-23 Thread Allen Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598615#comment-14598615
 ] 

Allen Wang commented on KAFKA-1215:
---

We have a working solution now for rack aware assignment. It is based on 
current patch for this JIRA but with some improvement. The key idea of the 
solution is:

- Rack ID is a String instead of integer
- For replica assignment, add an extra parameter of Map[Int, String] to 
assignReplicasToBrokers() method which maps broker ID to rack ID
- Before doing the rack aware assignment, sort the broker list such that they 
are interlaced according to the rack. In other words, adjacent brokers should 
not be in the same rack if possible . For example, assuming 6 brokers mapping 
to 3 racks:

0 - rack1, 1 - rack1, 2 - rack2, 3 - rack2, 4 - rack3, 5 - 
rack3

The sorted broker list could be (0, 2, 4, 1, 3, 5)

- Apply the same assignment algorithm to assign replicas, with the addition of 
skipping a broker if its rack is already used for the same partition (similar 
to what has been done in current patch)

The benefit of this approach is that replica distribution is kept as even as 
possible to all the racks and brokers.

With regard to KAFKA-1792, an easy solution is to restrict replica movement 
within the same rack, which I think should work in most practical cases. It 
will also have added benefit that usually replicas move faster within a rack. 
So basically we can apply the same algorithm described in KAFKA-1792 for each 
rack. For example, if there are three racks, then apply the algorithm three 
times, each time with broker list and assignment for that specific rack. Again, 
we assume the broker to rack mapping will be available in the method signature.

The open question is how to obtain broker to rack mapping. The information can 
be supplied when Kafka registers the broker with ZooKeeper which means some 
information has to be added to ZooKeeper. However, it could be that the rack 
information is already available in a deployment independent way. For example, 
for some deployment, the rack information may be available in a database. What 
we can do is to abstract out the API required to obtain rack information in an 
interface and allow user to supply an implementation in command line or at 
broker start up (to handle auto topic creation).





 

 Rack-Aware replica assignment option
 

 Key: KAFKA-1215
 URL: https://issues.apache.org/jira/browse/KAFKA-1215
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0
Reporter: Joris Van Remoortere
Assignee: Jun Rao
 Fix For: 0.9.0

 Attachments: rack_aware_replica_assignment_v1.patch, 
 rack_aware_replica_assignment_v2.patch


 Adding a rack-id to kafka config. This rack-id can be used during replica 
 assignment by using the max-rack-replication argument in the admin scripts 
 (create topic, etc.). By default the original replication assignment 
 algorithm is used because max-rack-replication defaults to -1. 
 max-rack-replication  -1 is not honored if you are doing manual replica 
 assignment (preffered).
 If this looks good I can add some test cases specific to the rack-aware 
 assignment.
 I can also port this to trunk. We are currently running 0.8.0 in production 
 and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 35791: Patch for KAFKA-2298

2015-06-23 Thread Joel Koshy


 On June 23, 2015, 5:59 p.m., Jason Gustafson wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  282
  https://reviews.apache.org/r/35791/diff/1/?file=990592#file990592line282
 
  Do you think we should just move the disconnected.add() into the close 
  method?
 
 Dong Lin wrote:
 I have thought about it as well. But probabaly no. Because in 
 Selector.send() we put failed destinationId is put in failedSends rather than 
 disconnected. The reason we use failedSends is because send() and poll() in 
 Selector will be called asynchronously by different threads.
 
 Jason Gustafson wrote:
 Yeah, that makes sense. I wonder if we should be using a Set internally 
 instead of a List? Then we wouldn't need to worry about adding to 
 disconnected multiple times. Guess there might be some performance impact, 
 but we would have an easier time ensuring correctness.
 
 Dong Lin wrote:
 Yeah that can be useful. I am not sure if currently some id may be added 
 to disconnected multiple times. Even it does, this should be not a problme in 
 NetworkClient.handleDisconnections(). I personally prefer to keep existing 
 code unless there is good reason (i.e. performance or functionality) for 
 change. Not sure what others think about this change.
 
 Jason Gustafson wrote:
 Seems unnecessary now that I think about it. In fact, it wouldn't hurt to 
 do the disconnected.add() into close() as long as we also add it to 
 failedSends, but that might make the code tougher to follow. Anyway, I think 
 the patch is fine as it is. I was just wondering if there was an easy way to 
 prevent this issue from popping up again in the future.
 
 Dong Lin wrote:
 Yeah I agree with you. I will add to disconnected in close() if I were to 
 write this class/function from scratch. But since this is a small patch to 
 existing module I will make the change that solves the problem with least 
 dispute to make peer review smooth.

Hi Dong - I'm not sure about your comment on multiple threads calling 
send/poll. NetworkClient and Selector are both documented as being 
`@nonthreadsafe` so that is probably a non-issue. The producer's sender for 
e.g., does send/poll in the same (sender) thread .


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35791/#review89010
---


On June 23, 2015, 5:41 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35791/
 ---
 
 (Updated June 23, 2015, 5:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2298
 https://issues.apache.org/jira/browse/KAFKA-2298
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2290; Client Selector can drop connections on InvalidReceiveException 
 without notifying NetworkClient
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4aee214b24fd990be003adc36d675f015bf22fe6 
 
 Diff: https://reviews.apache.org/r/35791/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin