Re: confirm subscribe to dev@kafka.apache.org

2014-11-24 Thread Ivan Dyachkov


On Mon, 24 Nov 2014, at 10:23, dev-h...@kafka.apache.org wrote:
> Hi! This is the ezmlm program. I'm managing the
> dev@kafka.apache.org mailing list.
> 
> I'm working for my owner, who can be reached
> at dev-ow...@kafka.apache.org.
> 
> To confirm that you would like
> 
>d...@dyachkov.org
> 
> added to the dev mailing list, please send
> a short reply to this address:
> 
>dev-sc.1416820990.ogbnnhpaalmbkhphgooh-dev=dyachkov@kafka.apache.org
> 
> Usually, this happens when you just hit the "reply" button.
> If this does not work, simply copy the address and paste it into
> the "To:" field of a new message.
> 
> or click here:
>   
> mailto:dev-sc.1416820990.ogbnnhpaalmbkhphgooh-dev=dyachkov@kafka.apache.org
> 
> This confirmation serves two purposes. First, it verifies that I am able
> to get mail through to you. Second, it protects you in case someone
> forges a subscription request in your name.
> 
> Please note that ALL Apache dev- and user- mailing lists are publicly
> archived.  Do familiarize yourself with Apache's public archive policy at
> 
> http://www.apache.org/foundation/public-archives.html
> 
> prior to subscribing and posting messages to dev@kafka.apache.org.
> If you're not sure whether or not the policy applies to this mailing list,
> assume it does unless the list name contains the word "private" in it.
> 
> Some mail programs are broken and cannot handle long addresses. If you
> cannot reply to this request, instead send a message to
>  and put the
> entire address listed above into the "Subject:" line.
> 
> 
> --- Administrative commands for the dev list ---
> 
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
> 
> To subscribe to the list, send a message to:
>
> 
> To remove your address from the list, send a message to:
>
> 
> Send mail to the following for info and FAQ for this list:
>
>
> 
> Similar addresses exist for the digest list:
>
>
> 
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
> 
> To get an index with subject and author for messages 123-456 , mail:
>
> 
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
> 
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
> 
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
> 
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
> 
> To stop subscription for this address, mail:
> 
> 
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
> 
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> dev-ow...@kafka.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
> 
> --- Enclosed is a copy of the request I received.
> 
> Return-Path: 
> Received: (qmail 92726 invoked by uid 99); 24 Nov 2014 09:23:10 -
> Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
> by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:23:10 +
> X-ASF-Spam-Status: No, hits=-2.7 required=10.0
>   tests=ASF_LIST_OPS,RCVD_IN_DNSWL_LOW,SPF_PASS
> X-Spam-Check-By: apache.org
> Received-SPF: pass (nike.apache.org: local policy)
> Received: from [66.111.4.28] (HELO out4-smtp.messagingengine.com) 
> (66.111.4.28)
> by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:22:45 +
> Received: from compute1.internal (compute1.nyi.internal [10.202.2.41])
>   by mailout.nyi.internal (Postfix) with ESMTP id 1566D20839
>   for ; Mon, 24 Nov 2014 04:22:44 -0500 
> (EST)
> Received: from web5 ([10.202.2.215])
>   by compute1.internal (MEProxy); Mon, 24 Nov 2014 04:22:44 -0500
> DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=dyachkov.org; h=
>   message-id:x-sasl-enc:from:to:mime-version
>   :content-transfer-encoding:content-type:subject:date; s=mesmtp;
>bh=TsVNbFUOZKeDeYnMQ3vFeAOWa1A=; b=GNrcpY2zYMsKWfMdU8xy3XXnbMpT
>   iEhy6ag1AkS5UzcaybhwL1RBcGk/bJFzmYPM6wDTUWzQ5kgqMHIZf7mRG1b3jPlw
>   qYjEKscQToDLL+oIh69UCPf190IPodIaXeST4XZdXq+guHcRfWHoWuQMKeThg9Ae
>   sO+mWdTXBJ7utyc=
> DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=
>   messagingengine.com; h=message-id:x-sasl-enc:from:to
>   :mime-version:content-transfer-encoding:content-type:subject
>   :date; s=smtpout; bh=TsVNbFUOZKeDeYnMQ3vFeAOWa1A=; b=kkwKwg4rRrg
>   ZrNb9Pu2SDdhkK0lU++hDKhlhDgb8oyJXqAyBXMtdLspNnfifRtxe0JxQxWwzMzz
>   fnHMvDADPxymHZSQvKZANIxj8hHj7QsFCAz/oQAFIUPiRuy6AoHdXl+negA+1nFn
>   2NNipAkjSoV6VHhr33AHe

Re: Review Request 25995: Patch for KAFKA-1650

2014-11-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Nov. 24, 2014, 4:15 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-11-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-11-24_08:15:17.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-11-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223099#comment-14223099
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:26 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code]

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 



> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-produc

[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code]

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 



> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:27 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 



> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are ve

Re: [DISCUSSION] Message Metadata

2014-11-24 Thread Guozhang Wang
My bad, the link should be this:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

On Fri, Nov 21, 2014 at 5:29 PM, Timothy Chen  wrote:

> Hi Guozhang,
>
> I don't think that is publically accessible, can you update it to the
> Kafka wiki?
>
> Tim
>
> On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang  wrote:
> > Hi all,
> >
> > I have updated the wiki page (
> >
> https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata
> )
> > according to people's comments and discussions offline.
> >
> > Guozhang
> >
> > On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang 
> wrote:
> >
> >> Hi Jun,
> >>
> >> Sorry for the delay on your comments in the wiki page as well as this
> >> thread; quite swamped now. I will get back to you as soon as I find some
> >> time.
> >>
> >> Guozhang
> >>
> >> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao  wrote:
> >>
> >>> Thinking about this a bit more. For adding the auditing support, I am
> not
> >>> sure if we need to change the message format by adding the application
> >>> tags. An alternative way to do that is to add it in the producer
> client.
> >>> For example, for each message payload (doesn't matter what the
> >>> serialization mechanism is) that a producer receives, the producer can
> >>> just
> >>> add a header before the original payload. The header will contain all
> >>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
> >>> This
> >>> way, we don't need to change the message format and the auditing info
> can
> >>> be added independent of the serialization mechanism of the message. The
> >>> header can use a different serialization mechanism for better
> efficiency.
> >>> For example, if we use Avro to serialize the header, the encoded bytes
> >>> won't include the field names in the header. This is potentially more
> >>> efficient than representing those fields as application tags in the
> >>> message
> >>> where the tags have to be explicitly store in every message.
> >>>
> >>> To make it easier for the client to add and make use of this kind of
> >>> auditing support, I was imagining that we can add a ProducerFactory in
> the
> >>> new java client. The ProducerFactory will create an instance of
> Producer
> >>> based on a config property. By default, the current KafkaProducer will
> be
> >>> returned. However, a user can plug in a different implementation of
> >>> Producer that does auditing. For example, an implementation of an
> >>> AuditProducer.send() can take the original ProducerRecord, add the
> header
> >>> to the value byte array and then forward the record to an underlying
> >>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
> >>> client. If a user plugs in an implementation of the AuditingConsumer,
> the
> >>> consumer will then be audited automatically.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang 
> >>> wrote:
> >>>
> >>> > Hi Jun,
> >>> >
> >>> > Regarding 4) in your comment, after thinking it for a while I cannot
> >>> come
> >>> > up a way to it along with log compaction without adding new fields
> into
> >>> the
> >>> > current format on message set. Do you have a better way that do not
> >>> require
> >>> > protocol changes?
> >>> >
> >>> > Guozhang
> >>> >
> >>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang 
> >>> wrote:
> >>> >
> >>> > > I have updated the wiki page incorporating received comments. We
> can
> >>> > > discuss some more details on:
> >>> > >
> >>> > > 1. How we want to do audit? Whether we want to have in-built
> auditing
> >>> on
> >>> > > brokers or even MMs or use  an audit consumer to fetch all messages
> >>> from
> >>> > > just brokers.
> >>> > >
> >>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
> >>> > > compaction turned on.
> >>> > >
> >>> > > 3. How we can resolve unclean leader election resulted data
> >>> inconsistency
> >>> > > with control messages.
> >>> > >
> >>> > > Guozhang
> >>> > >
> >>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <
> wangg...@gmail.com>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the detailed comments Jun! Some replies inlined.
> >>> > >>
> >>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao 
> wrote:
> >>> > >>
> >>> > >>> Hi, Guozhang,
> >>> > >>>
> >>> > >>> Thanks for the writeup.
> >>> > >>>
> >>> > >>> A few high level comments.
> >>> > >>>
> >>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
> >>> > >>> overall.
> >>> > >>> Yes, this could add a bit more management overhead in Kafka.
> >>> However,
> >>> > it
> >>> > >>> makes sure that the data format contract between a producer and a
> >>> > >>> consumer
> >>> > >>> is kept and managed in a central place, instead of in the
> >>> application.
> >>> > >>> The
> >>> > >>> latter is probably easier to start with, but is likely to be
> >>> brittle in
> >>> > >>> the
> >>> > >>> long run.
> >>> > >>>
> >>> > >>
> >>> > >> I am actually

Re: Review Request 27391: Fix KAFKA-1634

2014-11-24 Thread Joel Koshy


> On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17
> > 
> >
> > I thought we would be going with separate format for on-disk storage?
> > 
> > E.g., one thing that is extremely useful (until we have timestamp as a 
> > first-class field of messages) is to have the receive time of the 
> > offsetcommit in the stored offset entries. This is very useful for 
> > debugging.
> 
> Guozhang Wang wrote:
> Yes they are separated: for on-disk storage the timestamp will always 
> been stored, and for wire protocol only v0/1 will contain that value, but for 
> v2 this value will be computed via retention. So the on-disk format is 
> specified as OffsetAndMetadata, and when we deprecating v0/1 and adding the 
> timestmap to message header we will replace this with OffsetMetadata.

What i meant is that for the on-disk format it is useful to have the receive 
time (apart from the expiration time). Right now it seems only one timestamp 
(which is the expiration timestamp is stored).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 21, 2014, 10 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> ---
> 
> (Updated Nov. 21, 2014, 10 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
> https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> The timestamp field of OffsetAndMetadata is preserved since we need to be 
> backward compatible with older versions
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
> 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 2957bc435102bc4004d8f100dbcdd56287c8ffae 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 8c5364fa97da1be09973c176d1baeb339455d319 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 6:57 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to stealing CPU cycle , I 
think must protect it some how and must check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 



> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.

Re: Review Request 28268: Bump up default scala version from 2.10.1 to 2.10.4

2014-11-24 Thread Guozhang Wang


> On Nov. 22, 2014, 2:24 a.m., Joel Koshy wrote:
> > Looks good, although the patch does not apply - can you rebase; check that 
> > it builds properly and then we can close this out.

This is a little wired.. both you and Gwen said it does not apply but when I 
apply the patch (patch -p1 --dry-run < patch.file) it works fine. Will verify 
that one last time and commit.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28268/#review62721
---


On Nov. 20, 2014, 12:23 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28268/
> ---
> 
> (Updated Nov. 20, 2014, 12:23 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1624
> https://issues.apache.org/jira/browse/KAFKA-1624
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Compiles with Java 1.6/7/8
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 36c742b67a7259fa35c3ed862f7ccc4673b69d9f 
>   bin/windows/kafka-run-class.bat 8e9780e2eb74a35a726787155c09b151d0ba 
>   build.gradle 11eb11355efddacf62d61690ad13b9c82a200230 
>   gradle.properties 5d3155fd4461438d8b2ec4faa9534cc2383d4951 
>   scala.gradle 6adf9af7dbbe71e68a07b387c3854d8c9ad339e0 
> 
> Diff: https://reviews.apache.org/r/28268/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Updated] (KAFKA-1624) building on JDK 8 fails

2014-11-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1624:
-
Attachment: KAFKA-1624_2014-11-24_11:01:56.patch

> building on JDK 8 fails
> ---
>
> Key: KAFKA-1624
> URL: https://issues.apache.org/jira/browse/KAFKA-1624
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>Assignee: Guozhang Wang
>  Labels: newbie
> Fix For: 0.8.2
>
> Attachments: KAFKA-1624.patch, KAFKA-1624_2014-11-24_11:01:56.patch
>
>
> {code}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
> support was removed in 8.0
> error: error while loading CharSequence, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/CharSequence.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 10)
> error: error while loading Comparator, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Comparator.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 20)
> error: error while loading AnnotatedElement, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/reflect/AnnotatedElement.class)'
>  is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 76)
> error: error while loading Arrays, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Arrays.class)' is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 765)
> /tmp/sbt_53783b12/xsbt/ExtractAPI.scala:395: error: java.util.Comparator does 
> not take type parameters
>   private[this] val sortClasses = new Comparator[Symbol] {
> ^
> 5 errors found
> :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:compileScala'.
> > org.gradle.messaging.remote.internal.PlaceholderException (no error message)
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output.
> BUILD FAILED
> Total time: 1 mins 48.298 secs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28268: Fix KAFKA-1624

2014-11-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28268/
---

(Updated Nov. 24, 2014, 7:02 p.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1624


Bugs: KAFKA-1624
https://issues.apache.org/jira/browse/KAFKA-1624


Repository: kafka


Description (updated)
---

KAFKA-1624.v1


Diffs (updated)
-

  bin/kafka-run-class.sh 36c742b67a7259fa35c3ed862f7ccc4673b69d9f 
  bin/windows/kafka-run-class.bat 8e9780e2eb74a35a726787155c09b151d0ba 
  build.gradle 030af63553afbde5756ed5fd7be6d41a6e400f43 
  gradle.properties 5d3155fd4461438d8b2ec4faa9534cc2383d4951 
  scala.gradle 6adf9af7dbbe71e68a07b387c3854d8c9ad339e0 

Diff: https://reviews.apache.org/r/28268/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1624) building on JDK 8 fails

2014-11-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223304#comment-14223304
 ] 

Guozhang Wang commented on KAFKA-1624:
--

Updated reviewboard https://reviews.apache.org/r/28268/diff/
 against branch origin/trunk

> building on JDK 8 fails
> ---
>
> Key: KAFKA-1624
> URL: https://issues.apache.org/jira/browse/KAFKA-1624
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>Assignee: Guozhang Wang
>  Labels: newbie
> Fix For: 0.8.2
>
> Attachments: KAFKA-1624.patch, KAFKA-1624_2014-11-24_11:01:56.patch
>
>
> {code}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
> support was removed in 8.0
> error: error while loading CharSequence, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/CharSequence.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 10)
> error: error while loading Comparator, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Comparator.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 20)
> error: error while loading AnnotatedElement, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/reflect/AnnotatedElement.class)'
>  is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 76)
> error: error while loading Arrays, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Arrays.class)' is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 765)
> /tmp/sbt_53783b12/xsbt/ExtractAPI.scala:395: error: java.util.Comparator does 
> not take type parameters
>   private[this] val sortClasses = new Comparator[Symbol] {
> ^
> 5 errors found
> :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:compileScala'.
> > org.gradle.messaging.remote.internal.PlaceholderException (no error message)
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output.
> BUILD FAILED
> Total time: 1 mins 48.298 secs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28268: Fix KAFKA-1624

2014-11-24 Thread Guozhang Wang


> On Nov. 22, 2014, 2:24 a.m., Joel Koshy wrote:
> > Looks good, although the patch does not apply - can you rebase; check that 
> > it builds properly and then we can close this out.
> 
> Guozhang Wang wrote:
> This is a little wired.. both you and Gwen said it does not apply but 
> when I apply the patch (patch -p1 --dry-run < patch.file) it works fine. Will 
> verify that one last time and commit.

OK I found out the reason. It is because the rb diff command does not handle 
window-based "^M" well in kafka-run-class.bat, and after changing to use linux 
diff it works fine.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28268/#review62721
---


On Nov. 24, 2014, 7:02 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28268/
> ---
> 
> (Updated Nov. 24, 2014, 7:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1624
> https://issues.apache.org/jira/browse/KAFKA-1624
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1624.v1
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 36c742b67a7259fa35c3ed862f7ccc4673b69d9f 
>   bin/windows/kafka-run-class.bat 8e9780e2eb74a35a726787155c09b151d0ba 
>   build.gradle 030af63553afbde5756ed5fd7be6d41a6e400f43 
>   gradle.properties 5d3155fd4461438d8b2ec4faa9534cc2383d4951 
>   scala.gradle 6adf9af7dbbe71e68a07b387c3854d8c9ad339e0 
> 
> Diff: https://reviews.apache.org/r/28268/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Updated] (KAFKA-1624) building on JDK 8 fails

2014-11-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1624:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> building on JDK 8 fails
> ---
>
> Key: KAFKA-1624
> URL: https://issues.apache.org/jira/browse/KAFKA-1624
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>Assignee: Guozhang Wang
>  Labels: newbie
> Fix For: 0.8.2
>
> Attachments: KAFKA-1624.patch, KAFKA-1624_2014-11-24_11:01:56.patch
>
>
> {code}
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
> support was removed in 8.0
> error: error while loading CharSequence, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/CharSequence.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 10)
> error: error while loading Comparator, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Comparator.class)' is 
> broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 20)
> error: error while loading AnnotatedElement, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/reflect/AnnotatedElement.class)'
>  is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 76)
> error: error while loading Arrays, class file 
> '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Arrays.class)' is broken
> (class java.lang.RuntimeException/bad constant pool tag 18 at byte 765)
> /tmp/sbt_53783b12/xsbt/ExtractAPI.scala:395: error: java.util.Comparator does 
> not take type parameters
>   private[this] val sortClasses = new Comparator[Symbol] {
> ^
> 5 errors found
> :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:compileScala'.
> > org.gradle.messaging.remote.internal.PlaceholderException (no error message)
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output.
> BUILD FAILED
> Total time: 1 mins 48.298 secs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223532#comment-14223532
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh


> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223532#comment-14223532
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:21 PM:
-

[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh



was (Author: bmis13):
Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh


> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223532#comment-14223532
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:22 PM:
-

[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up so it 
is all related in my opinion.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh



was (Author: bmis13):
[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh


> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223571#comment-14223571
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

Here is exact steps how to reproducer it bug: (Must have demon program 
continuously running).

1)  Start with happy Situation where all borkers are up everything is running 
fine.  And verify all top -pid JAVA_PID and your kit (kafka network threads  
are taking less than 4% CPU).
2)  Shutdown network (turn off network or pull the eth0 cable)  wait for while 
and you will see that CPU spike to 325% under top  (if you have 4 producer) and 
verify your kit is showing 25% CPU consumption for for each Kafka io thread.
3) Connect back the network ( Spike will still be there but CPU after while 
come down to 100% or so ) and remain connected for while.  
4) again simulate network failure (to simulate network instability) repeat 
steps again 1 to 4 but wait for 10 or so minutes in between and you will see 
the trends of CPU spike along with above exception. 
java.lang.IllegalStateException: No entry found for node -2

Also, I see that Kafka is logging excessively when network is down (your kit 
shows it is taking more CPU Cycle  as compare  to normal)

Thanks,
Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223626#comment-14223626
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

Also, there is issue in my last patch.  I did not update the 
lastConnectAttemptMs...in connecting.

{code}
 /**
 * Enter the connecting state for the given node.
 * @param node The id of the node we are connecting to
 * @param now The current time.
 */
public void connecting(int node, long now) {
NodeConnectionState nodeConn = nodeState.get(node); 
if(nodeConn == null){
nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
}else{
nodeConn.state = ConnectionState.CONNECTING;
nodeConn.lastConnectAttemptMs = now;  (This will capture and 
update last connection attempt) 

}
}
{code}

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223626#comment-14223626
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 10:16 PM:
--

Also, there is issue in my experimental patch.  I did not update the 
lastConnectAttemptMs...in connecting state method to solve the issue with 
illegal sate exp:
{code}
 /**
 * Enter the connecting state for the given node.
 * @param node The id of the node we are connecting to
 * @param now The current time.
 */
public void connecting(int node, long now) {
NodeConnectionState nodeConn = nodeState.get(node); 
if(nodeConn == null){
nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
}else{
nodeConn.state = ConnectionState.CONNECTING;
nodeConn.lastConnectAttemptMs = now;  (This will capture and 
update last connection attempt) 

}
}
{code}


was (Author: bmis13):
Also, there is issue in my last patch.  I did not update the 
lastConnectAttemptMs...in connecting.

{code}
 /**
 * Enter the connecting state for the given node.
 * @param node The id of the node we are connecting to
 * @param now The current time.
 */
public void connecting(int node, long now) {
NodeConnectionState nodeConn = nodeState.get(node); 
if(nodeConn == null){
nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
}else{
nodeConn.state = ConnectionState.CONNECTING;
nodeConn.lastConnectAttemptMs = now;  (This will capture and 
update last connection attempt) 

}
}
{code}

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1796) Sanity check partition command line tools

2014-11-24 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1796:


 Summary: Sanity check partition command line tools
 Key: KAFKA-1796
 URL: https://issues.apache.org/jira/browse/KAFKA-1796
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 0.0.9


We need to sanity check the input json has the valid values (for example, the 
replica list does not have duplicate broker ids, etc) before triggering the 
admin process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1580) Reject producer requests to internal topics

2014-11-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1580:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Reject producer requests to internal topics
> ---
>
> Key: KAFKA-1580
> URL: https://issues.apache.org/jira/browse/KAFKA-1580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Joel Koshy
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 0.8.2
>
> Attachments: KAFKA-1580.patch, KAFKA-1580.patch, 
> KAFKA-1580_2014-08-14_16:50:40.patch, KAFKA-1580_2014-08-14_16:56:50.patch, 
> KAFKA-1580_2014-08-14_18:21:38.patch, KAFKA-1580_2014-08-15_15:05:29.patch
>
>
> Producer requests to internal topics (currently only __consumer_offset) can 
> be disastrous.
> E.g., if we allow a message to be appended to the offsets topic this could 
> lead to fatal exceptions when loading the offsets topic and when compacting 
> the log.
> Producer requests to these topics should be rejected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223755#comment-14223755
 ] 

Ewen Cheslack-Postava commented on KAFKA-1642:
--

Ok, so as I suspected, you need to wait awhile before the issue shows up. It 
looks to me like this is due to a metadata refresh. This causes metadataTimeout 
in Client.poll() to be 0, but then maybeUpdateMetadata is unable to make any 
progress since it can't connect to any nodes. The previous patch fixed issues 
that caused the timeout parameter to that method to be 0, so this is a similar 
issue. However, under normal testing it won't always show up immediately -- you 
need to wait until the next metadata refresh, which is currently every 5 
minutes.

I need to think more about the details of the fix. That timeout shouldn't 
consistently be 0 if we're just trying to refresh metadata, but we need to make 
sure we select an appropriate timeout for each case. Looking through 
maybeUpdateMetadata there are a few different possibilities:

1. leastLoadedNode returns null, leaving no nodes available and we don't even 
try to refresh
2. The selected node is connected and we can send more data - we mark 
metadataFetchInProgress to avoid resending requests, but should probably also 
use that to determine the timeout on poll()
3. The selected node is connected but we can't send more data yet
4. The selected node is not connected, but we are allowed to try to initiate a 
connection based on the reconnection backoff.
4a. Trying to initiate the connection may return an immediate error
4b. Or we'll need to wait for the connection event.
5. The selected node is not connected and we aren't allowed to initiate a new 
connection yet.

Given that all these conditions are based on the code in maybeUpdateMetadata 
(and initiateConnect, which it calls), it probably makes sense to have that 
code return an appropriate timeout to be used in poll(). But we need to make 
sure the selected values are also combined correctly with the timeout passed 
into poll() and that any wakeups before that time also subsequently produce 
correct values.

The logic in the Sender.run() and NetworkClient.poll() are complex and need to 
handle a lot of different cases, but it should be possible to fix this problem 
only by adjusting that code without adding retries/backoff further up the 
stack. The core of this problem is just that that loop is selecting too small a 
timeout.

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhavesh Mistry updated KAFKA-1642:
--
Affects Version/s: 0.8.1.1

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1642:
-
Priority: Blocker  (was: Major)

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223779#comment-14223779
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~ewencp],

Thanks for looking into this really appreciate your response. 

Also, do you think rapid connect and disconnect is also due to incorrect Node 
state management ?  connecting method and initiateConnection also ?

Thanks,

Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Does Kafka Producer service ?

2014-11-24 Thread Krishna Raj
Hello Amazing Kafka Creators & User,

I have learnt and use kafka in our Production system, so you can count my
understanding as intermediate.

With the statement that "Kafka has solved the Scalability and Availability
needs for a large scale message publish/subscribe system", I understand
that having a Producer Service which sits in between the Application and
the Producer defects the one major purpose of Kafka.

So, my question is, How to loosely couple Kafka with my Production
Application ?

The reason being, I wish to do all producer code and Kafka library
maintenance without affecting my large scale Production system. Its not an
easy thing to buy a window to these type of changes done on a large scale
production application :)

Any advice on how this can be achieved(even moderately) will greatly help ?

Thanks,
Krishna Raj


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-24 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/#review62909
---



core/src/main/scala/kafka/consumer/ConsumerIterator.scala


Since we're changing the behavior of ConsumerIterator, let's document it 
here. For example, hasNext is now unblocking.



core/src/main/scala/kafka/utils/IteratorTemplate.scala


The change is larger than I expected it to be. What is the reason we 
couldn't change the peek() implementation and add poll to the IteratorTemplate? 
Since IteratorTemplate is not used elsewhere, it may be an ok change. 

But I guess you must've thought about this and chose not to do it this way. 
Mind explaining the reasoning?

I also think this is the right way to do things. However, weighing that 
with the fact that we are making a somewhat large change to an API that is 
going away, how bad is implementing peek by passing in a consumer timeout of 0?


- Neha Narkhede


On Nov. 17, 2014, 5:41 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28121/
> ---
> 
> (Updated Nov. 17, 2014, 5:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1780
> https://issues.apache.org/jira/browse/KAFKA-1780
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1780 Add peek and poll methods to ConsumerIterator.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> 78fbf75651583e390258af2d9f09df6911a97b59 
>   core/src/main/scala/kafka/utils/IteratorTemplate.scala 
> fd952f3ec0f04a3ba639c02779634265489fd186 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
> c0355cc0135c6af2e346b4715659353a31723b86 
>   core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
> 46a4e899ef293c56a931bfa5bcf9a07d07ec5792 
> 
> Diff: https://reviews.apache.org/r/28121/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Commented] (KAFKA-1780) Add peek()/poll() for ConsumerIterator

2014-11-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223830#comment-14223830
 ] 

Neha Narkhede commented on KAFKA-1780:
--

[~ewencp] Sorry for the late review. Posted comments on the rb.

> Add peek()/poll() for ConsumerIterator
> --
>
> Key: KAFKA-1780
> URL: https://issues.apache.org/jira/browse/KAFKA-1780
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Attachments: KAFKA-1780.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is 
> problematic for a couple of use cases. Most obviously, a peek() method would 
> be nice so you can at least check whether any data is immediately available, 
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages 
> for up to N ms or M messages, and returns whatever it has at the end of that 
> period. It's possible to approximate that with peek, but requires aggressive 
> polling to match the proxy's timeout. A poll(timeout) method would allow for 
> a correct implementation, where each call to poll gets a single message, but 
> also allows the user to specify a custom timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1780) Add peek()/poll() for ConsumerIterator

2014-11-24 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1780:
-
Attachment: KAFKA-1780_2014-11-24_17:24:54.patch

> Add peek()/poll() for ConsumerIterator
> --
>
> Key: KAFKA-1780
> URL: https://issues.apache.org/jira/browse/KAFKA-1780
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Attachments: KAFKA-1780.patch, KAFKA-1780_2014-11-24_17:24:54.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is 
> problematic for a couple of use cases. Most obviously, a peek() method would 
> be nice so you can at least check whether any data is immediately available, 
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages 
> for up to N ms or M messages, and returns whatever it has at the end of that 
> period. It's possible to approximate that with peek, but requires aggressive 
> polling to match the proxy's timeout. A poll(timeout) method would allow for 
> a correct implementation, where each call to poll gets a single message, but 
> also allows the user to specify a custom timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-24 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/
---

(Updated Nov. 25, 2014, 1:24 a.m.)


Review request for kafka.


Bugs: KAFKA-1780
https://issues.apache.org/jira/browse/KAFKA-1780


Repository: kafka


Description (updated)
---

Expand documentation on ConsumerIterator to reflect new non-blocking APIs.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
78fbf75651583e390258af2d9f09df6911a97b59 
  core/src/main/scala/kafka/utils/IteratorTemplate.scala 
fd952f3ec0f04a3ba639c02779634265489fd186 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
c0355cc0135c6af2e346b4715659353a31723b86 
  core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
46a4e899ef293c56a931bfa5bcf9a07d07ec5792 

Diff: https://reviews.apache.org/r/28121/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1780) Add peek()/poll() for ConsumerIterator

2014-11-24 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223905#comment-14223905
 ] 

Ewen Cheslack-Postava commented on KAFKA-1780:
--

Updated reviewboard https://reviews.apache.org/r/28121/diff/
 against branch origin/trunk

> Add peek()/poll() for ConsumerIterator
> --
>
> Key: KAFKA-1780
> URL: https://issues.apache.org/jira/browse/KAFKA-1780
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Attachments: KAFKA-1780.patch, KAFKA-1780_2014-11-24_17:24:54.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is 
> problematic for a couple of use cases. Most obviously, a peek() method would 
> be nice so you can at least check whether any data is immediately available, 
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages 
> for up to N ms or M messages, and returns whatever it has at the end of that 
> period. It's possible to approximate that with peek, but requires aggressive 
> polling to match the proxy's timeout. A poll(timeout) method would allow for 
> a correct implementation, where each call to poll gets a single message, but 
> also allows the user to specify a custom timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-24 Thread Ewen Cheslack-Postava


> On Nov. 25, 2014, 12:26 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ConsumerIterator.scala, line 30
> > 
> >
> > Since we're changing the behavior of ConsumerIterator, let's document 
> > it here. For example, hasNext is now unblocking.

I'll update the docs since we now have some more methods available. But just to 
be clear, hasNext is still blocking -- it calls maybeComputeNext() (no params) 
which calls makeNext() (no params), which calls makeNext(-1, Milliseconds), 
which implies indefinite timeout.  matches the behavior implied by the Iterator 
interface. I figured that even if we wanted to make it non-blocking, that'd 
wait for a major version upgrade since that's pretty fundamental behavior that 
people probably depend on (and certainly aren't catching whatever exception we 
would throw).


> On Nov. 25, 2014, 12:26 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/utils/IteratorTemplate.scala, line 46
> > 
> >
> > The change is larger than I expected it to be. What is the reason we 
> > couldn't change the peek() implementation and add poll to the 
> > IteratorTemplate? Since IteratorTemplate is not used elsewhere, it may be 
> > an ok change. 
> > 
> > But I guess you must've thought about this and chose not to do it this 
> > way. Mind explaining the reasoning?
> > 
> > I also think this is the right way to do things. However, weighing that 
> > with the fact that we are making a somewhat large change to an API that is 
> > going away, how bad is implementing peek by passing in a consumer timeout 
> > of 0?

IteratorTemplate actually is used a bunch of other places via anonymous 
classes, although admittedly not publicly. I initially started down the path of 
only modifying IteratorTemplate, but it ends up being a bigger, awkward change 
since the ConsumerIterator was the only implementation that could support 
non-blocking operations. Things get even messier if you support both peek and 
poll since now instead of a boolean you have to support a timeout. Poll is 
important to support if you want to accurately hit timeouts when you want to 
collect more than one message (e.g. a proxy). You'll notice the bulk of the new 
non-test code is in NonBlockingIteratorTemplate.poll() because it doesn't 
decompose nicely into the existing hasNext() and next() calls, which means the 
bulk of the patch would still be there anyway. One possibility to tighten it up 
would be to break the original steps into checkNext, getItem, and clearItem, 
then implement hasNext, next, peek, and poll using those even smaller pie
 ces. But I think breaking it down too much obfuscates the code and isn't 
really worth the minor deduplication.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/#review62909
---


On Nov. 25, 2014, 1:24 a.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28121/
> ---
> 
> (Updated Nov. 25, 2014, 1:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1780
> https://issues.apache.org/jira/browse/KAFKA-1780
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Expand documentation on ConsumerIterator to reflect new non-blocking APIs.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> 78fbf75651583e390258af2d9f09df6911a97b59 
>   core/src/main/scala/kafka/utils/IteratorTemplate.scala 
> fd952f3ec0f04a3ba639c02779634265489fd186 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
> c0355cc0135c6af2e346b4715659353a31723b86 
>   core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
> 46a4e899ef293c56a931bfa5bcf9a07d07ec5792 
> 
> Diff: https://reviews.apache.org/r/28121/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223779#comment-14223779
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 1:31 AM:
-

[~ewencp],

Thanks for looking into this really appreciate your response. 

Also, do you think rapid connect and disconnect is also due to incorrect Node 
state management ?  connecting method and initiateConnection also ?

Also, Can we also take the defensive coding and have protection in this tight 
infinite loop to throttle CPU cycle if it ends up with start-end duration is 
below some xx ms.  This will actually prevent this issues.We had this issue 
on Prod so I just wanted to highlight the impact of 325% CPU and excessive 
logging. 

Thanks,

Bhavesh 


was (Author: bmis13):
[~ewencp],

Thanks for looking into this really appreciate your response. 

Also, do you think rapid connect and disconnect is also due to incorrect Node 
state management ?  connecting method and initiateConnection also ?

Thanks,

Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-24 Thread Neha Narkhede


> On Nov. 25, 2014, 12:26 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ConsumerIterator.scala, line 30
> > 
> >
> > Since we're changing the behavior of ConsumerIterator, let's document 
> > it here. For example, hasNext is now unblocking.
> 
> Ewen Cheslack-Postava wrote:
> I'll update the docs since we now have some more methods available. But 
> just to be clear, hasNext is still blocking -- it calls maybeComputeNext() 
> (no params) which calls makeNext() (no params), which calls makeNext(-1, 
> Milliseconds), which implies indefinite timeout.  matches the behavior 
> implied by the Iterator interface. I figured that even if we wanted to make 
> it non-blocking, that'd wait for a major version upgrade since that's pretty 
> fundamental behavior that people probably depend on (and certainly aren't 
> catching whatever exception we would throw).

Ah ok. Got it.


> On Nov. 25, 2014, 12:26 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/utils/IteratorTemplate.scala, line 46
> > 
> >
> > The change is larger than I expected it to be. What is the reason we 
> > couldn't change the peek() implementation and add poll to the 
> > IteratorTemplate? Since IteratorTemplate is not used elsewhere, it may be 
> > an ok change. 
> > 
> > But I guess you must've thought about this and chose not to do it this 
> > way. Mind explaining the reasoning?
> > 
> > I also think this is the right way to do things. However, weighing that 
> > with the fact that we are making a somewhat large change to an API that is 
> > going away, how bad is implementing peek by passing in a consumer timeout 
> > of 0?
> 
> Ewen Cheslack-Postava wrote:
> IteratorTemplate actually is used a bunch of other places via anonymous 
> classes, although admittedly not publicly. I initially started down the path 
> of only modifying IteratorTemplate, but it ends up being a bigger, awkward 
> change since the ConsumerIterator was the only implementation that could 
> support non-blocking operations. Things get even messier if you support both 
> peek and poll since now instead of a boolean you have to support a timeout. 
> Poll is important to support if you want to accurately hit timeouts when you 
> want to collect more than one message (e.g. a proxy). You'll notice the bulk 
> of the new non-test code is in NonBlockingIteratorTemplate.poll() because it 
> doesn't decompose nicely into the existing hasNext() and next() calls, which 
> means the bulk of the patch would still be there anyway. One possibility to 
> tighten it up would be to break the original steps into checkNext, getItem, 
> and clearItem, then implement hasNext, next, peek, and poll using those even 
> small
 er pieces. But I think breaking it down too much obfuscates the code and isn't 
really worth the minor deduplication.

Makes sense. I'd imagine the checkNext refactoring won't buy us much. Curious 
what you think about the consumer timeout=0 hack?


- Neha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/#review62909
---


On Nov. 25, 2014, 1:24 a.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28121/
> ---
> 
> (Updated Nov. 25, 2014, 1:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1780
> https://issues.apache.org/jira/browse/KAFKA-1780
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Expand documentation on ConsumerIterator to reflect new non-blocking APIs.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> 78fbf75651583e390258af2d9f09df6911a97b59 
>   core/src/main/scala/kafka/utils/IteratorTemplate.scala 
> fd952f3ec0f04a3ba639c02779634265489fd186 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
> c0355cc0135c6af2e346b4715659353a31723b86 
>   core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
> 46a4e899ef293c56a931bfa5bcf9a07d07ec5792 
> 
> Diff: https://reviews.apache.org/r/28121/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Created] (KAFKA-1797) add the serializer/deserializer api to the new java client

2014-11-24 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1797:
--

 Summary: add the serializer/deserializer api to the new java client
 Key: KAFKA-1797
 URL: https://issues.apache.org/jira/browse/KAFKA-1797
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao


Currently, the new java clients take a byte array for both the key and the 
value. While this api is simple, it pushes the serialization/deserialization 
logic into the application. This makes it hard to reason about what type of 
data flows through Kafka and also makes it hard to share an implementation of 
the serializer/deserializer. For example, to support Avro, the serialization 
logic could be quite involved since it might need to register the Avro schema 
in some remote registry and maintain a schema cache locally, etc. Without a 
serialization api, it's impossible to share such an implementation so that 
people can easily reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1797) add the serializer/deserializer api to the new java client

2014-11-24 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1797:
---
Status: Patch Available  (was: Open)

> add the serializer/deserializer api to the new java client
> --
>
> Key: KAFKA-1797
> URL: https://issues.apache.org/jira/browse/KAFKA-1797
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-1797.patch
>
>
> Currently, the new java clients take a byte array for both the key and the 
> value. While this api is simple, it pushes the serialization/deserialization 
> logic into the application. This makes it hard to reason about what type of 
> data flows through Kafka and also makes it hard to share an implementation of 
> the serializer/deserializer. For example, to support Avro, the serialization 
> logic could be quite involved since it might need to register the Avro schema 
> in some remote registry and maintain a schema cache locally, etc. Without a 
> serialization api, it's impossible to share such an implementation so that 
> people can easily reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1797) add the serializer/deserializer api to the new java client

2014-11-24 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1797:
---
Attachment: kafka-1797.patch

> add the serializer/deserializer api to the new java client
> --
>
> Key: KAFKA-1797
> URL: https://issues.apache.org/jira/browse/KAFKA-1797
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-1797.patch
>
>
> Currently, the new java clients take a byte array for both the key and the 
> value. While this api is simple, it pushes the serialization/deserialization 
> logic into the application. This makes it hard to reason about what type of 
> data flows through Kafka and also makes it hard to share an implementation of 
> the serializer/deserializer. For example, to support Avro, the serialization 
> logic could be quite involved since it might need to register the Avro schema 
> in some remote registry and maintain a schema cache locally, etc. Without a 
> serialization api, it's impossible to share such an implementation so that 
> people can easily reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1797) add the serializer/deserializer api to the new java client

2014-11-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223927#comment-14223927
 ] 

Jun Rao commented on KAFKA-1797:


Created reviewboard https://reviews.apache.org/r/28423/diff/
 against branch origin/trunk

> add the serializer/deserializer api to the new java client
> --
>
> Key: KAFKA-1797
> URL: https://issues.apache.org/jira/browse/KAFKA-1797
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-1797.patch
>
>
> Currently, the new java clients take a byte array for both the key and the 
> value. While this api is simple, it pushes the serialization/deserialization 
> logic into the application. This makes it hard to reason about what type of 
> data flows through Kafka and also makes it hard to share an implementation of 
> the serializer/deserializer. For example, to support Avro, the serialization 
> logic could be quite involved since it might need to register the Avro schema 
> in some remote registry and maintain a schema cache locally, etc. Without a 
> serialization api, it's impossible to share such an implementation so that 
> people can easily reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 28423: Patch for kafka-1797

2014-11-24 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28423/
---

Review request for kafka.


Bugs: kafka-1797
https://issues.apache.org/jira/browse/kafka-1797


Repository: kafka


Description
---

add the serializer/deserializer to the new java client api


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
227f5646ee708af1b861c15237eda2140cfd4900 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
46efc0c8483acacf42b2984ac3f3b9e0a4566187 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
436d8a479166eda29f2672b50fc99f288bbe3fa9 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
2ecfc8aaea90a7353bd0dabc4c0ebcc6fd9535ec 
  clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
fe93afa24fc20b03830f1d190a276041d15bd3b9 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
c3aad3b4d6b677f759583f309061193f2f109250 
  
clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
32f444ebbd27892275af7a0947b86a6b8317a374 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
36e8398416036cab84faad1f07159e5adefd8086 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
72d3ddd0c29bf6c08f9e122c8232bc07612cd448 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
c3181b368b6cf15e7134b04e8ff5655a9321ee0b 
  clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 40e8234f8771098b097bf757a86d5ac98604df5e 
  
clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
 PRE-CREATION 
  core/src/main/scala/kafka/producer/BaseProducer.scala 
b0207930dd0543f2c51f0b35002e13bf104340ff 
  core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala 
4b5b823b85477394cd50eb2a66877a3b8b35b57f 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/tools/ReplayLogProducer.scala 
3393a3dd574ac45a27bf7eda646b737146c55038 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
67196f30af1cfcd40ded20ca970082b78504f6af 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
6379f2b60af797b084981c94fd84b3d7740aa8a5 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
a913fe59ba6f7c86a48e264ff85158a345b4e9e4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
d407af9144ef6930d737a6dcf23591c1f6342f87 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
0da774d0ed015bdc0461b854e3540ee6e48d1838 

Diff: https://reviews.apache.org/r/28423/diff/


Testing
---


Thanks,

Jun Rao



[DISCUSSION] adding the serializer api back to the new java producer

2014-11-24 Thread Jun Rao
Hi, Everyone,

I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new java
producer takes a byte array for both the key and the value. While this api
is simple, it pushes the serialization logic into the application. This
makes it hard to reason about what type of data is being sent to Kafka and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite involved
since it might need to register the Avro schema in some remote registry and
maintain a schema cache locally, etc. Without a serialization api, it's
impossible to share such an implementation so that people can easily reuse.
We sort of overlooked this implication during the initial discussion of the
producer api.

So, I'd like to propose an api change to the new producer by adding back
the serializer api similar to what we had in the old producer. Specially,
the proposed api changes are the following.

First, we change KafkaProducer to take generic types K and V for the key
and the value, respectively.

public class KafkaProducer implements Producer {

public Future send(ProducerRecord record, Callback
callback);

public Future send(ProducerRecord record);
}

Second, we add two new configs, one for the key serializer and another for
the value serializer. Both serializers will default to the byte array
implementation.

public class ProducerConfig extends AbstractConfig {

.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}

Both serializers will implement the following interface.

public interface Serializer extends Configurable {
public byte[] serialize(String topic, T data, boolean isKey);

public void close();
}

This is more or less the same as what's in the old producer. The slight
differences are (1) the serializer now only requires a parameter-less
constructor; (2) the serializer has a configure() and a close() method for
initialization and cleanup, respectively; (3) the serialize() method
additionally takes the topic and an isKey indicator, both of which are
useful for things like schema registration.

The detailed changes are included in KAFKA-1797. For completeness, I also
made the corresponding changes for the new java consumer api as well.

Note that the proposed api changes are incompatible with what's in the
0.8.2 branch. However, if those api changes are beneficial, it's probably
better to include them now in the 0.8.2 release, rather than later.

I'd like to discuss mainly two things in this thread.
1. Do people feel that the proposed api changes are reasonable?
2. Are there any concerns of including the api changes in the 0.8.2 final
release?

Thanks,

Jun


Re: Review Request 27990: Patch for KAFKA-1751

2014-11-24 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27990/#review62922
---


In addition to ensuring that the reassignment command doesn't reassign 
non-existent topics to non-existent brokers, but it may not be sufficient. We 
need to see how the controller handles these cases. There can be a window after 
the command checks for non-existent topics/brokers and another admin command 
sneaking in to change the state.


core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala


Looks like this is meant to check for non existent topics. However, I'm not 
sure that this quite works. For example, if non existent topics are specified, 
getReplicaAssignmentForTopics throws an exception instead of dropping the topic 
out of the list. 

I think we should just the check topic API from AdminUtils, create a list 
of topics that don't exist and fail the operation with the appropriate error 
message that lists all non-existent topics.



core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala


Both these APIs could be moved to AdminUtils



core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala


It is worth checking for the topic explicitly (using topicExists) and give 
an appropriate error message.



core/src/test/scala/unit/kafka/admin/AdminTest.scala


We should add 2 test cases that expliclity try to reassign topics that 
don't exist and brokers that don't exist.



core/src/test/scala/unit/kafka/admin/AdminTest.scala


Can you change this to use intercept[AdminCommandFailedException]


- Neha Narkhede


On Nov. 19, 2014, 9:57 a.m., Dmitry Pekar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27990/
> ---
> 
> (Updated Nov. 19, 2014, 9:57 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1751
> https://issues.apache.org/jira/browse/KAFKA-1751
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1751 / CR fixes
> 
> 
> KAFKA-1751 / CR fixes #2
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
> 979992b68af3723cd229845faff81c641123bb88 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> e28979827110dfbbb92fe5b152e7f1cc973de400 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
> 
> Diff: https://reviews.apache.org/r/27990/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dmitry Pekar
> 
>



Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-24 Thread Jay Kreps
This is admittedly late in the release cycle to make a change. To add to
Jun's description the motivation was that we felt it would be better to
change that interface now rather than after the release if it needed to
change.

The motivation for wanting to make a change was the ability to really be
able to develop support for Avro and other serialization formats. The
current status is pretty scattered--there is a schema repository on an Avro
JIRA and another fork of that on github, and a bunch of people we have
talked to have done similar things for other serialization systems. It
would be nice if these things could be packaged in such a way that it was
possible to just change a few configs in the producer and get rich metadata
support for messages.

As we were thinking this through we realized that the new api we were about
to introduce was kind of not very compatable with this since it was just
byte[] oriented.

You can always do this by adding some kind of wrapper api that wraps the
producer. But this puts us back in the position of trying to document and
support multiple interfaces.

This also opens up the possibility of adding a MessageValidator or
MessageInterceptor plug-in transparently so that you can do other custom
validation on the messages you are sending which obviously requires access
to the original object not the byte array.

This api doesn't prevent using byte[] by configuring the
ByteArraySerializer it works as it currently does.

-Jay

On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:

> Hi, Everyone,
>
> I'd like to start a discussion on whether it makes sense to add the
> serializer api back to the new java producer. Currently, the new java
> producer takes a byte array for both the key and the value. While this api
> is simple, it pushes the serialization logic into the application. This
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.
>
> So, I'd like to propose an api change to the new producer by adding back
> the serializer api similar to what we had in the old producer. Specially,
> the proposed api changes are the following.
>
> First, we change KafkaProducer to take generic types K and V for the key
> and the value, respectively.
>
> public class KafkaProducer implements Producer {
>
> public Future send(ProducerRecord record, Callback
> callback);
>
> public Future send(ProducerRecord record);
> }
>
> Second, we add two new configs, one for the key serializer and another for
> the value serializer. Both serializers will default to the byte array
> implementation.
>
> public class ProducerConfig extends AbstractConfig {
>
> .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC)
> .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> VALUE_SERIALIZER_CLASS_DOC);
> }
>
> Both serializers will implement the following interface.
>
> public interface Serializer extends Configurable {
> public byte[] serialize(String topic, T data, boolean isKey);
>
> public void close();
> }
>
> This is more or less the same as what's in the old producer. The slight
> differences are (1) the serializer now only requires a parameter-less
> constructor; (2) the serializer has a configure() and a close() method for
> initialization and cleanup, respectively; (3) the serialize() method
> additionally takes the topic and an isKey indicator, both of which are
> useful for things like schema registration.
>
> The detailed changes are included in KAFKA-1797. For completeness, I also
> made the corresponding changes for the new java consumer api as well.
>
> Note that the proposed api changes are incompatible with what's in the
> 0.8.2 branch. However, if those api changes are beneficial, it's probably
> better to include them now in the 0.8.2 release, rather than later.
>
> I'd like to discuss mainly two things in this thread.
> 1. Do people feel that the proposed api changes are reasonable?
> 2. Are there any concerns of including the api changes in the 0.8.2 final
> release?
>
> Thanks,
>
> Jun
>


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-24 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/#review62926
---

Ship it!


- Neha Narkhede


On Nov. 25, 2014, 1:24 a.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28121/
> ---
> 
> (Updated Nov. 25, 2014, 1:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1780
> https://issues.apache.org/jira/browse/KAFKA-1780
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Expand documentation on ConsumerIterator to reflect new non-blocking APIs.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> 78fbf75651583e390258af2d9f09df6911a97b59 
>   core/src/main/scala/kafka/utils/IteratorTemplate.scala 
> fd952f3ec0f04a3ba639c02779634265489fd186 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
> c0355cc0135c6af2e346b4715659353a31723b86 
>   core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
> 46a4e899ef293c56a931bfa5bcf9a07d07ec5792 
> 
> Diff: https://reviews.apache.org/r/28121/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Commented] (KAFKA-1780) Add peek()/poll() for ConsumerIterator

2014-11-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223949#comment-14223949
 ] 

Neha Narkhede commented on KAFKA-1780:
--

This patch touches the consumer iterator. LGTM but I want to see if other 
committers want to take a look, given the nature of the change. cc [~junrao], 
[~jjkoshy]

> Add peek()/poll() for ConsumerIterator
> --
>
> Key: KAFKA-1780
> URL: https://issues.apache.org/jira/browse/KAFKA-1780
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Attachments: KAFKA-1780.patch, KAFKA-1780_2014-11-24_17:24:54.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is 
> problematic for a couple of use cases. Most obviously, a peek() method would 
> be nice so you can at least check whether any data is immediately available, 
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages 
> for up to N ms or M messages, and returns whatever it has at the end of that 
> period. It's possible to approximate that with peek, but requires aggressive 
> polling to match the proxy's timeout. A poll(timeout) method would allow for 
> a correct implementation, where each call to poll gets a single message, but 
> also allows the user to specify a custom timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224039#comment-14224039
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

Here are some more cases to reproduce this simulating network connection issue 
with one of brokers only and still problem persist:

Case 1:  brokers connection is down (note according to ZK leader for partition 
still with b1 ) 
Have tree brokers: b1, b2, b3
1)  Start your daemon program and keep sending data to all the brokers and 
continue sending some data 
2)  Observed that you have data  netstat -a | grep b1|b2|b3   (keep pumping 
data for 5 minutes and observed normal behavior using top -pid or top -p 
java_pid )
3) Simulate a network connection or problem establishing new TCP connection via 
following as java program still continues to pump data aggressively (please 
note TCP connection to B1 still active and connected)
a)  sudo vi /etc/hosts 2) add entry "b1 127.0.0.1" 
b) /etc/init.d/network restart  after while (5 to 7 minutes you will see the 
issue but keep pumping data, and also repeat this for b2 it will be more CPU 
consumption) 
 
4) Under a heavy dumping data, now producer will try to establish new TCP 
connection to B1 and it will get connection refused (Note that CPU spikes up 
again and remain in state) just because could not establish.

Case 2) Simulate Firewall rule such as you are only allowed (4 TCP connection 
to each brokers) 

Do step 1,2 and 3 above.
4) use Iptable rule to reject 
To start an "enforcing fire wall":
iptables -A OUTPUT -p tcp -m tcp -d b1 --dport 9092 -j REJECT
5) Still pump data will while iptable rejects ( you will see CPU spike to to 
200% more depending on # of producer)
To "recover" :
iptables -D OUTPUT -p tcp -m tcp -d b1 --dport 9092 -j REJECT


> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above. 


Thanks,

Bhavesh  

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-24 Thread Gwen Shapira
As one of the people who spent too much time building Avro repositories, +1
on bringing serializer API back.

I think it will make the new producer easier to work with.

Gwen

On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps  wrote:

> This is admittedly late in the release cycle to make a change. To add to
> Jun's description the motivation was that we felt it would be better to
> change that interface now rather than after the release if it needed to
> change.
>
> The motivation for wanting to make a change was the ability to really be
> able to develop support for Avro and other serialization formats. The
> current status is pretty scattered--there is a schema repository on an Avro
> JIRA and another fork of that on github, and a bunch of people we have
> talked to have done similar things for other serialization systems. It
> would be nice if these things could be packaged in such a way that it was
> possible to just change a few configs in the producer and get rich metadata
> support for messages.
>
> As we were thinking this through we realized that the new api we were about
> to introduce was kind of not very compatable with this since it was just
> byte[] oriented.
>
> You can always do this by adding some kind of wrapper api that wraps the
> producer. But this puts us back in the position of trying to document and
> support multiple interfaces.
>
> This also opens up the possibility of adding a MessageValidator or
> MessageInterceptor plug-in transparently so that you can do other custom
> validation on the messages you are sending which obviously requires access
> to the original object not the byte array.
>
> This api doesn't prevent using byte[] by configuring the
> ByteArraySerializer it works as it currently does.
>
> -Jay
>
> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
>
> > Hi, Everyone,
> >
> > I'd like to start a discussion on whether it makes sense to add the
> > serializer api back to the new java producer. Currently, the new java
> > producer takes a byte array for both the key and the value. While this
> api
> > is simple, it pushes the serialization logic into the application. This
> > makes it hard to reason about what type of data is being sent to Kafka
> and
> > also makes it hard to share an implementation of the serializer. For
> > example, to support Avro, the serialization logic could be quite involved
> > since it might need to register the Avro schema in some remote registry
> and
> > maintain a schema cache locally, etc. Without a serialization api, it's
> > impossible to share such an implementation so that people can easily
> reuse.
> > We sort of overlooked this implication during the initial discussion of
> the
> > producer api.
> >
> > So, I'd like to propose an api change to the new producer by adding back
> > the serializer api similar to what we had in the old producer. Specially,
> > the proposed api changes are the following.
> >
> > First, we change KafkaProducer to take generic types K and V for the key
> > and the value, respectively.
> >
> > public class KafkaProducer implements Producer {
> >
> > public Future send(ProducerRecord record,
> Callback
> > callback);
> >
> > public Future send(ProducerRecord record);
> > }
> >
> > Second, we add two new configs, one for the key serializer and another
> for
> > the value serializer. Both serializers will default to the byte array
> > implementation.
> >
> > public class ProducerConfig extends AbstractConfig {
> >
> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> > KEY_SERIALIZER_CLASS_DOC)
> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> > VALUE_SERIALIZER_CLASS_DOC);
> > }
> >
> > Both serializers will implement the following interface.
> >
> > public interface Serializer extends Configurable {
> > public byte[] serialize(String topic, T data, boolean isKey);
> >
> > public void close();
> > }
> >
> > This is more or less the same as what's in the old producer. The slight
> > differences are (1) the serializer now only requires a parameter-less
> > constructor; (2) the serializer has a configure() and a close() method
> for
> > initialization and cleanup, respectively; (3) the serialize() method
> > additionally takes the topic and an isKey indicator, both of which are
> > useful for things like schema registration.
> >
> > The detailed changes are included in KAFKA-1797. For completeness, I also
> > made the corresponding changes for the new java consumer api as well.
> >
> > Note that the proposed api changes are incompatible with what's in the
> > 0.8.2 branch. However, if those api changes are beneficial, it's probably
> > better to include them now in the 0.8.2 release, rather than later.
> >
> > I'd like to discuss mainly two things in this thread.
> > 1. Do people feel that the proposed api changes are reasonable?
> >

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:37 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out.  
Once thanks for your detail analysis.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above. 


Thanks,

Bhavesh  

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:39 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out.  
Once thanks for your detail analysis.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:40 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with come configuration time out)

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224046#comment-14224046
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

Also, Are you going to port back the back to 0.8.1.1 version as well ?  Please 
let me know also.

Thanks,
Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224046#comment-14224046
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:43 AM:
-

Also, Are you going to port back the patch to 0.8.1.1 version as well ?  Please 
let me know also.

Thanks,
Bhavesh 


was (Author: bmis13):
Also, Are you going to port back the back to 0.8.1.1 version as well ?  Please 
let me know also.

Thanks,
Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-24 Thread Sriram Subramanian
Looked at the patch. +1 from me.

On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:

>As one of the people who spent too much time building Avro repositories,
>+1
>on bringing serializer API back.
>
>I think it will make the new producer easier to work with.
>
>Gwen
>
>On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps  wrote:
>
>> This is admittedly late in the release cycle to make a change. To add to
>> Jun's description the motivation was that we felt it would be better to
>> change that interface now rather than after the release if it needed to
>> change.
>>
>> The motivation for wanting to make a change was the ability to really be
>> able to develop support for Avro and other serialization formats. The
>> current status is pretty scattered--there is a schema repository on an
>>Avro
>> JIRA and another fork of that on github, and a bunch of people we have
>> talked to have done similar things for other serialization systems. It
>> would be nice if these things could be packaged in such a way that it
>>was
>> possible to just change a few configs in the producer and get rich
>>metadata
>> support for messages.
>>
>> As we were thinking this through we realized that the new api we were
>>about
>> to introduce was kind of not very compatable with this since it was just
>> byte[] oriented.
>>
>> You can always do this by adding some kind of wrapper api that wraps the
>> producer. But this puts us back in the position of trying to document
>>and
>> support multiple interfaces.
>>
>> This also opens up the possibility of adding a MessageValidator or
>> MessageInterceptor plug-in transparently so that you can do other custom
>> validation on the messages you are sending which obviously requires
>>access
>> to the original object not the byte array.
>>
>> This api doesn't prevent using byte[] by configuring the
>> ByteArraySerializer it works as it currently does.
>>
>> -Jay
>>
>> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
>>
>> > Hi, Everyone,
>> >
>> > I'd like to start a discussion on whether it makes sense to add the
>> > serializer api back to the new java producer. Currently, the new java
>> > producer takes a byte array for both the key and the value. While this
>> api
>> > is simple, it pushes the serialization logic into the application.
>>This
>> > makes it hard to reason about what type of data is being sent to Kafka
>> and
>> > also makes it hard to share an implementation of the serializer. For
>> > example, to support Avro, the serialization logic could be quite
>>involved
>> > since it might need to register the Avro schema in some remote
>>registry
>> and
>> > maintain a schema cache locally, etc. Without a serialization api,
>>it's
>> > impossible to share such an implementation so that people can easily
>> reuse.
>> > We sort of overlooked this implication during the initial discussion
>>of
>> the
>> > producer api.
>> >
>> > So, I'd like to propose an api change to the new producer by adding
>>back
>> > the serializer api similar to what we had in the old producer.
>>Specially,
>> > the proposed api changes are the following.
>> >
>> > First, we change KafkaProducer to take generic types K and V for the
>>key
>> > and the value, respectively.
>> >
>> > public class KafkaProducer implements Producer {
>> >
>> > public Future send(ProducerRecord record,
>> Callback
>> > callback);
>> >
>> > public Future send(ProducerRecord record);
>> > }
>> >
>> > Second, we add two new configs, one for the key serializer and another
>> for
>> > the value serializer. Both serializers will default to the byte array
>> > implementation.
>> >
>> > public class ProducerConfig extends AbstractConfig {
>> >
>> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
>> > "org.apache.kafka.clients.producer.ByteArraySerializer",
>>Importance.HIGH,
>> > KEY_SERIALIZER_CLASS_DOC)
>> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
>> > "org.apache.kafka.clients.producer.ByteArraySerializer",
>>Importance.HIGH,
>> > VALUE_SERIALIZER_CLASS_DOC);
>> > }
>> >
>> > Both serializers will implement the following interface.
>> >
>> > public interface Serializer extends Configurable {
>> > public byte[] serialize(String topic, T data, boolean isKey);
>> >
>> > public void close();
>> > }
>> >
>> > This is more or less the same as what's in the old producer. The
>>slight
>> > differences are (1) the serializer now only requires a parameter-less
>> > constructor; (2) the serializer has a configure() and a close() method
>> for
>> > initialization and cleanup, respectively; (3) the serialize() method
>> > additionally takes the topic and an isKey indicator, both of which are
>> > useful for things like schema registration.
>> >
>> > The detailed changes are included in KAFKA-1797. For completeness, I
>>also
>> > made the corresponding changes for the new java consumer api as well.
>> >
>> > Note that the proposed api changes are incompatible with what's in the
>> > 0.8.2 branch. However, if those api changes are 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 5:37 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with some configuration time out so thread does not hang)

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with come configuration time out)

Thanks,

Bhavesh  

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1, 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)