test of producer's delay and consumer's delay

2016-06-17 Thread Kafka
hello,I have done a series of tests on kafka 0.9.0,and one of the results 
confused me.

test enviroment:
 kafka cluster: 3 brokers,8core cpu / 8g mem /1g netcard
 client:4core cpu/4g mem
 topic:6 partitions,2 replica
 
 total messages:1
 singal message size:1024byte
 fetch.min.bytes:1
 fetch.wait.max.ms:100ms

all send tests are under the enviroment of using scala sync interface,

when I set ack to 0,the producer’s delay is 0.3ms,the consumer’s delay is 7.7ms
when I set ack to 1,the producer's delay is 1.6ms, the consumer’s delay is 3.7ms
when I set ack to -1,the produce's delay is 3.5ms, the consumer’s delay is 4.2ms

but why consumer’s delay is decreased when I set ack from 0 to 1,its confused 
me。




Re: FYI - Apache ZooKeeper Backup, a Treatise

2016-06-17 Thread Andrew Purtell
See HBASE-14379. The points on configuration, state management, and security 
apply. 


> On Jun 17, 2016, at 7:25 PM, Martin Serrano  wrote:
> 
> Why are you seeking to undo it?
> 
>> On 06/17/2016 09:34 PM, Andrew Purtell wrote:
>> HBase stores replication peering configuration in ZK. We're working on
>> undoing that, but for now that information exists nowhere else.
>> 
>> 
>>> On Thu, Jun 16, 2016 at 2:47 PM, Ismael Juma  wrote:
>>> 
>>> Hi Jordan,
>>> 
>>> Kafka stores ACLs as well as client and topic configs in ZooKeeper so that
>>> lends credence to your argument, I think.
>>> 
>>> Ismael
>>> 
>>> On Thu, Jun 16, 2016 at 11:41 PM, Jordan Zimmerman <
>>> jor...@jordanzimmerman.com> wrote:
>>> 
 Contrary to recommendations everywhere, my experience is that almost
 everyone is storing source of truth data in ZooKeeper. It’s just too
 tempting. You have a distributed file system just sitting there and it’s
 too easy to use. You get a lot of great features like watches, etc.
>>> People
 are using it to store configuration data, sequence numbers, etc. They are
 storing these things without a good means of reproducing them in case of
>>> a
 catastrophic outage. Further, I’ve heard of several orgs who just back up
 the transaction logs and think they can restore them for DR. Anyway,
>>> that’s
 the genesis of my blog post.
 
 -Jordan
 
>> On Jun 16, 2016, at 2:39 PM, Chris Nauroth 
> wrote:
> Yes, thank you to Jordan for the article!
> 
> Like Flavio, I personally have never come across the requirement for
> ZooKeeper backups.  I've generally followed the pattern that data
>>> stored
> in ZooKeeper is truly transient, and applications are built either to
> tolerate loss of that data or reconstruct it from first principles if
>>> it
> goes missing.  Adding observers in a second data center would give a
> rudimentary approximation of off-site backup in the case of a data
>>> center
> disaster, with the usual caveats around propagation delays.
> 
> Jordan, I'd be curious if you can share more specific details about the
> kind of data that you have that necessitates a backup/restore.  (If
 you're
> not at liberty to share this, then I can understand that.)  It might
> inform if we have a motivating use case for backup/restore features
 within
> ZooKeeper, such as some of the transaction log filtering that the
>>> article
> mentions.
> 
> --Chris Nauroth
> 
> 
> 
> 
>> On 6/16/16, 1:03 AM, "Flavio Junqueira"  wrote:
>> 
>> Great write-up, Jordan, thanks!
>> 
>> Whether to backup zk data or not is possibly an open topic for this
>> community, even though we have discussed it at times. My sense has
>>> been
>> that precisely because of the issues you mention in your post, it is
>> typically best to have a way to recreate its data upon a disaster
>>> rather
>> than backup the data. I think there could be three general scenarios
>>> in
>> which folks would prefer to backup data, but you correct me if these
>> aren't accurate:
>> 
>> - The data in zk isn't elsewhere, so it can't be recreated: zk isn't a
>> regular database, so I'd think it is best not to store data and focus
>>> on
>> cluster data or metadata.
>> - There is a just a lot of data and I'd rather have a shorter time to
>> recover: zk in general shouldn't have that much data in db, but let's
>>> go
>> with the assumption that for the requirements of the application it
>>> is a
>> lot. For such a case, it probably depends on whether your application
 can
>> efficiently and effectively recover from a backup. Basically, as
>>> pointed
>> out in the post, the data could be inconsistent and cause trouble if
>>> you
>> don't think about the corner cases.
>> - The code to recreate the zk metadata for my application is super
>> complex: if you decide to code against zk, it is good to think whether
>> reconstructing in the case of a disaster is doable and if it is design
>> and implement to reconstruct the state upon a disaster.
>> 
>> Also, we typically provision enough replicas, often replicating across
>> data centers, to make sure that the data isn't all gone. Having more
>> replicas does not rule out completely the possibility of a disaster,
>>> but
>> in such rare cases we resort to the expensive path.
>> 
>> I personally have never worked with an application that was taking
>> backups of zk data in prod, so I'm really interested in what others
>> think.
>> 
>> -Flavio
>> 
>> 
>>> On 16 Jun 2016, at 00:43, Jordan Zimmerman <
>>> jor...@jordanzimmerman.com
>>> wrote:
>>> 
>>> FYI - I wrote a blog about backing up ZooKeeper:
>>> 
>>> 

Re: FYI - Apache ZooKeeper Backup, a Treatise

2016-06-17 Thread Andrew Purtell
HBase stores replication peering configuration in ZK. We're working on
undoing that, but for now that information exists nowhere else.


On Thu, Jun 16, 2016 at 2:47 PM, Ismael Juma  wrote:

> Hi Jordan,
>
> Kafka stores ACLs as well as client and topic configs in ZooKeeper so that
> lends credence to your argument, I think.
>
> Ismael
>
> On Thu, Jun 16, 2016 at 11:41 PM, Jordan Zimmerman <
> jor...@jordanzimmerman.com> wrote:
>
> > Contrary to recommendations everywhere, my experience is that almost
> > everyone is storing source of truth data in ZooKeeper. It’s just too
> > tempting. You have a distributed file system just sitting there and it’s
> > too easy to use. You get a lot of great features like watches, etc.
> People
> > are using it to store configuration data, sequence numbers, etc. They are
> > storing these things without a good means of reproducing them in case of
> a
> > catastrophic outage. Further, I’ve heard of several orgs who just back up
> > the transaction logs and think they can restore them for DR. Anyway,
> that’s
> > the genesis of my blog post.
> >
> > -Jordan
> >
> > > On Jun 16, 2016, at 2:39 PM, Chris Nauroth 
> > wrote:
> > >
> > > Yes, thank you to Jordan for the article!
> > >
> > > Like Flavio, I personally have never come across the requirement for
> > > ZooKeeper backups.  I've generally followed the pattern that data
> stored
> > > in ZooKeeper is truly transient, and applications are built either to
> > > tolerate loss of that data or reconstruct it from first principles if
> it
> > > goes missing.  Adding observers in a second data center would give a
> > > rudimentary approximation of off-site backup in the case of a data
> center
> > > disaster, with the usual caveats around propagation delays.
> > >
> > > Jordan, I'd be curious if you can share more specific details about the
> > > kind of data that you have that necessitates a backup/restore.  (If
> > you're
> > > not at liberty to share this, then I can understand that.)  It might
> > > inform if we have a motivating use case for backup/restore features
> > within
> > > ZooKeeper, such as some of the transaction log filtering that the
> article
> > > mentions.
> > >
> > > --Chris Nauroth
> > >
> > >
> > >
> > >
> > > On 6/16/16, 1:03 AM, "Flavio Junqueira"  wrote:
> > >
> > >> Great write-up, Jordan, thanks!
> > >>
> > >> Whether to backup zk data or not is possibly an open topic for this
> > >> community, even though we have discussed it at times. My sense has
> been
> > >> that precisely because of the issues you mention in your post, it is
> > >> typically best to have a way to recreate its data upon a disaster
> rather
> > >> than backup the data. I think there could be three general scenarios
> in
> > >> which folks would prefer to backup data, but you correct me if these
> > >> aren't accurate:
> > >>
> > >> - The data in zk isn't elsewhere, so it can't be recreated: zk isn't a
> > >> regular database, so I'd think it is best not to store data and focus
> on
> > >> cluster data or metadata.
> > >> - There is a just a lot of data and I'd rather have a shorter time to
> > >> recover: zk in general shouldn't have that much data in db, but let's
> go
> > >> with the assumption that for the requirements of the application it
> is a
> > >> lot. For such a case, it probably depends on whether your application
> > can
> > >> efficiently and effectively recover from a backup. Basically, as
> pointed
> > >> out in the post, the data could be inconsistent and cause trouble if
> you
> > >> don't think about the corner cases.
> > >> - The code to recreate the zk metadata for my application is super
> > >> complex: if you decide to code against zk, it is good to think whether
> > >> reconstructing in the case of a disaster is doable and if it is design
> > >> and implement to reconstruct the state upon a disaster.
> > >>
> > >> Also, we typically provision enough replicas, often replicating across
> > >> data centers, to make sure that the data isn't all gone. Having more
> > >> replicas does not rule out completely the possibility of a disaster,
> but
> > >> in such rare cases we resort to the expensive path.
> > >>
> > >> I personally have never worked with an application that was taking
> > >> backups of zk data in prod, so I'm really interested in what others
> > >> think.
> > >>
> > >> -Flavio
> > >>
> > >>
> > >>> On 16 Jun 2016, at 00:43, Jordan Zimmerman <
> jor...@jordanzimmerman.com
> > >
> > >>> wrote:
> > >>>
> > >>> FYI - I wrote a blog about backing up ZooKeeper:
> > >>>
> > >>> https://www.elastic.co/blog/zookeeper-backup-a-treatise
> > >>>
> > >>> -Jordan
> > >>
> > >
> >
> >
>



-- 
Best regards,

   - Andy

Problems worthy of attack prove their worth by hitting back. - Piet Hein
(via Tom White)


[jira] [Updated] (KAFKA-1429) Yet another deadlock in controller shutdown

2016-06-17 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-1429:
---
Attachment: kafka_0.9.0.0_controller_dead_lock.patch

upload file is the patch to fix this bug, can somebody review it ?

> Yet another deadlock in controller shutdown
> ---
>
> Key: KAFKA-1429
> URL: https://issues.apache.org/jira/browse/KAFKA-1429
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
> Attachments: kafka_0.9.0.0_controller_dead_lock.patch
>
>
> Found one more case of deadlock in controller during shutdown:
> {code}
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57 state=TIMED_WAITING
> - waiting on <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> - locked <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Locked synchronizers: count = 1
>   - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a
> kafka-scheduler-0 id=172 state=WAITING
> - waiting on <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - locked <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>  owned by 
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
> at kafka.utils.Utils$.inLock(Utils.scala:536)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1108)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1108)
> at 
> 

[GitHub] kafka pull request #1521: MINOR: Check null keys in KTableSource

2016-06-17 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1521

MINOR: Check null keys in KTableSource



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
Kminor-check-nullkey-ktable-source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1521


commit 41eedf2e70a1873ffd87c22bb0bcf58217ea66ff
Author: Guozhang Wang 
Date:   2016-06-17T23:16:11Z

check null keys in KTableSource




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1520: HOTFIX: Check hasNext in KStreamWindowReduce

2016-06-17 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1520

HOTFIX: Check hasNext in KStreamWindowReduce



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
KHotfix-iter-hasNext-window-value-getter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1520


commit 2a974544c6808a43347a069b1cda20a8536f453a
Author: Guozhang Wang 
Date:   2016-06-17T23:31:00Z

check hasNext in KStreamWindowReduce as well




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2016-06-17 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337202#comment-15337202
 ] 

Guozhang Wang commented on KAFKA-3862:
--

Good to know [~dminkovsky]! Please do let us know if you found other 
documentation issues as we are actively improving on that end for better user 
experience. And we are improving in both CP and OS docs as well.

> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-17 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337172#comment-15337172
 ] 

Guozhang Wang edited comment on KAFKA-3839 at 6/17/16 11:00 PM:


Hi [~jeyhunkarimov], It seems I created this ticket as a duplicate of 
KAFKA-3817 and have actually fixed the {{groupBy}} issue myself. Sorry for the 
confusion.

I'm close this ticket now, feel free to work on 
https://issues.apache.org/jira/browse/KAFKA-3836. 


was (Author: guozhang):
Hi [~jeyhunkarimov], It seems I created this ticket as a duplicate of 
KAFKA-3817 and have actually fixed the {{groupBy}} issue myself. Sorry for the 
confusion.

I will create this ticket for now, feel free to work on 
https://issues.apache.org/jira/browse/KAFKA-3836. 

> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3836) KStreamReduce and KTableReduce should not pass nulls to Deserializers

2016-06-17 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3836:
-
Labels: architecture  (was: )

> KStreamReduce and KTableReduce should not pass nulls to Deserializers
> -
>
> Key: KAFKA-3836
> URL: https://issues.apache.org/jira/browse/KAFKA-3836
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Avi Flax
>Assignee: Jeyhun Karimov
>Priority: Trivial
>  Labels: architecture
>
> As per [this 
> discussion|http://mail-archives.apache.org/mod_mbox/kafka-users/201606.mbox/%3ccahwhrru29jw4jgvhsijwbvlzb3bc6qz6pbh9tqcfbcorjk4...@mail.gmail.com%3e]
>  these classes currently pass null values along to Deserializers, so those 
> Deserializers need to handle null inputs and pass them through without 
> throwing. It would be better for these classes to simply not call the 
> Deserializers in this case; this would reduce the burden of implementers of 
> {{Deserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-17 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3839.
--
Resolution: Duplicate

> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-17 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337172#comment-15337172
 ] 

Guozhang Wang commented on KAFKA-3839:
--

Hi [~jeyhunkarimov], It seems I created this ticket as a duplicate of 
KAFKA-3817 and have actually fixed the {{groupBy}} issue myself. Sorry for the 
confusion.

I will create this ticket for now, feel free to work on 
https://issues.apache.org/jira/browse/KAFKA-3836. 

> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2016-06-17 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337080#comment-15337080
 ] 

James Cheng commented on KAFKA-3861:


Semi-related to https://issues.apache.org/jira/browse/KAFKA-3410

> Shrunk ISR before leader crash makes the partition unavailable
> --
>
> Key: KAFKA-3861
> URL: https://issues.apache.org/jira/browse/KAFKA-3861
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>
> We observed a case that the leader experienced a crash and lost its in-memory 
> data and latest HW offsets. Normally Kafka should be safe and be able to make 
> progress with a single node failure. However a few seconds before the crash 
> the leader shrunk its ISR to itself, which is safe since min-in-sync-replicas 
> is 2 and replication factor is 3 thus the troubled leader cannot accept new 
> produce messages. After the crash however the controller could not name any 
> of the of the followers as the new leader since as far as the controller 
> knows they are not in ISR and could potentially be behind the last leader. 
> Note that unclean-leader-election is disabled in this cluster since the 
> cluster requires a very high degree of durability and cannot tolerate data 
> loss.
> The impact could get worse if the admin brings up the crashed broker in an 
> attempt to make such partitions available again; this would take down even 
> more brokers as the followers panic when they find their offset larger than 
> HW offset in the leader:
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("Halting because log truncation is not allowed for topic 
> %s,".format(topicAndPartition.topic) +
>   " Current leader %d's latest offset %d is less than replica %d's 
> latest offset %d"
>   .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
> replica.logEndOffset.messageOffset))
> Runtime.getRuntime.halt(1)
>   }
> {code}
> One hackish solution would be that the admin investigates the logs, determine 
> that unclean-leader-election in this particular case would be safe and 
> temporarily enables it (while the crashed node is down) until new leaders are 
> selected for affected partitions, wait for the topics LEO advances far enough 
> and then brings up the crashed node again. This manual process is however 
> slow and error-prone and the cluster will suffer partial unavailability in 
> the meanwhile.
> We are thinking of having the controller make an exception for this case: if 
> ISR size is less than min-in-sync-replicas and the new leader would be -1, 
> then the controller does an RPC to all the replicas and inquire of the latest 
> offset, and if all the replicas responded then chose the one with the largest 
> offset as the leader as well as the new ISR. Note that the controller cannot 
> do that if any of the non-leader replicas do not respond since there might be 
> a case that the responding replicas have not been involved the last ISR and 
> hence potentially behind the others (and the controller could not know that 
> since it does not keep track of previous ISR).
> Pros would be that kafka will be safely available when such cases occur and 
> would not require any admin intervention. The cons however is that the 
> controller talking to brokers inside the leader election function would break 
> the existing pattern in the source code as currently the leader is elected 
> locally without requiring any additional RPC.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3863) Add system test for connector failure/restart

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337034#comment-15337034
 ] 

ASF GitHub Bot commented on KAFKA-3863:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1519

KAFKA-3863: System tests covering connector/task failure and restart



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-3863

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1519


commit 5d615cf54319e703e841486844d7b690a84ab1fe
Author: Jason Gustafson 
Date:   2016-06-16T23:48:01Z

KAFKA-3863: System tests covering connector/task failure and restart




> Add system test for connector failure/restart
> -
>
> Key: KAFKA-3863
> URL: https://issues.apache.org/jira/browse/KAFKA-3863
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, system tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> We should have system tests covering connector/task failure and the ability 
> to restart through the REST API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1519: KAFKA-3863: System tests covering connector/task f...

2016-06-17 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1519

KAFKA-3863: System tests covering connector/task failure and restart



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-3863

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1519


commit 5d615cf54319e703e841486844d7b690a84ab1fe
Author: Jason Gustafson 
Date:   2016-06-16T23:48:01Z

KAFKA-3863: System tests covering connector/task failure and restart




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3863) Add system test for connector failure/restart

2016-06-17 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-3863:
--

 Summary: Add system test for connector failure/restart
 Key: KAFKA-3863
 URL: https://issues.apache.org/jira/browse/KAFKA-3863
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect, system tests
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We should have system tests covering connector/task failure and the ability to 
restart through the REST API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-17 Thread Jun Rao
Jason,

Thanks for the KIP. +1

Just one clarification. The KIP adds a rebalance timeout in the protocol,
but didn't say what value will be used. I guess we will use
max.poll.interval.ms?

Jun

On Thu, Jun 16, 2016 at 11:44 AM, Jason Gustafson 
wrote:

> Hi All,
>
> I'd like to open the vote for KIP-62. This proposal attempts to address one
> of the recurring usability problems that users of the new consumer have
> faced with as little impact as possible. You can read the full details
> here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> .
>
> After some discussion on this list, I think we were in agreement that this
> change addresses a major part of the problem and we've left the door open
> for further improvements, such as adding a heartbeat() API or a separately
> configured rebalance timeout. Thanks in advance to everyone who helped
> review the proposal.
>
> -Jason
>


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-17 Thread Jun Rao
Grant,

I think Dana has a valid point. Currently, we throw an
InvalidRequestException and close the connection only when the broker can't
deserialize the bytes into a request. In this case, the deserialization is
fine. It just that there are some additional constraints that can't be
specified at the protocol level. We can potentially just remember the
topics that violated those constraints in the request and handle them
accordingly with the right error code w/o disconnecting.

Thanks,

Jun

On Fri, Jun 17, 2016 at 8:40 AM, Dana Powers  wrote:

> I'm unconvinced (crazy, right?). Comments below:
>
> On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke  wrote:
> > Hi Dana,
> >
> > You mentioned one of the reasons I error and disconnect. Because I can't
> > return an error for every request so the cardinality between request and
> > response would be different. Beyond that though, I am handling this
> > protocol rule/parsing error the same way all other messages do.
>
> But you can return an error for every topic, and isn't that the level
> of error required here?
>
> > CreateTopic Response (Version: 0) => [topic_error_codes]
> >   topic_error_codes => topic error_code
> > topic => STRING
> > error_code => INT16
>
> If I submit duplicate requests for a topic, it's an error isolated to
> that topic. If I mess up the partition / replication / etc semantics
> for a topic, that's an error isolated to that topic. Is there a
> cardinality problem at this level?
>
>
> >
> > Parsing is handled in the RequestChannel and any exception that occurs
> > during this phase is caught, converted into an InvalidRequestException
> and
> > results in a disconnect:
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
> >
> > Since this is an error that could only occur (and would always occur) due
> > to incorrect client implementations, and not because of any cluster state
> > or unusual situation, I felt this behavior was okay and made sense. For
> > client developers the broker logging should make it obvious what the
> issue
> > is. My patch also clearly documents the protocol rules in the Protocol
> > definition.
>
> Documentation is great and definitely a must. But requiring client
> developers to dig through server logs is not ideal. Client developers
> don't always have direct access to those logs. And even if they do,
> the brokers may have other traffic, which makes it difficult to track
> down the exact point in the logs where the error occurred.
>
> As discussed above, I don't think you need to or should model this as
> a request-level parsing error. It may be easier for the current broker
> implementation to do that and just crash the connection, but I don't
> think it makes that much sense from a raw api perspective.
>
> > In the future having a response header with an error code (and optimally
> > error message) for every response would help solve this problem (for all
> > message types).
>
> That will definitely help solve the more general invalid request error
> problem. But I think given the current state of error handling /
> feedback from brokers on request-level errors, you should treat
> connection crash as a last resort. I think there is a good opportunity
> to avoid it in this case, and I think the api would be better if done
> that way.
>
> -Dana
>
> > On Fri, Jun 17, 2016 at 12:04 AM, Dana Powers 
> wrote:
> >
> >> Why disconnect the client on a InvalidRequestException? The 2 errors
> >> you are catching are both topic-level: (1) multiple requests for the
> >> same topic, and (2) ReplicaAssignment and num_partitions /
> >> replication_factor both set. Wouldn't it be better to just error the
> >> offending create_topic_request, not the entire connection? The
> >> CreateTopicsResponse returns a map of topics to error codes. You could
> >> just return the topic that caused the error and an
> >> InvalidRequestException error code.
> >>
> >> -Dana
> >>
> >> On Thu, Jun 16, 2016 at 8:37 AM, Grant Henke 
> wrote:
> >> > I have updated the wiki and pull request based on the feedback. If
> there
> >> > are no objections I will start a vote at the end of the day.
> >> >
> >> > Details for this implementation can be read here:
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
> >> >
> >> > The updated pull request can be found here (feel free to review):
> >> > https://github.com/apache/kafka/pull/1489
> >> >
> >> > Below is the exact content for clarity:
> >> >
> >> >> Create Topics Request (KAFKA-2945
> >> >> )
> >> >>
> >> >>
> >> >>
> >> >> CreateTopics Request (Version: 0) => [create_topic_requests] timeout
> >> >>   create_topic_requests => topic num_partitions 

[jira] [Commented] (KAFKA-3632) ConsumerLag metrics persist after partition migration

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336852#comment-15336852
 ] 

ASF GitHub Bot commented on KAFKA-3632:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1518

KAFKA-3632; remove fetcher metrics on shutdown and leader migration



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka port-kafka-3632-to-0.9

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1518


commit a8edbed8220941e2df06c4f333262fe40d49bb9b
Author: Jason Gustafson 
Date:   2016-05-04T19:10:41Z

KAFKA-3632; remove fetcher metrics on shutdown and leader migration

Author: Jason Gustafson 

Reviewers: Ismael Juma 

Closes #1312 from hachikuji/KAFKA-3632




> ConsumerLag metrics persist after partition migration
> -
>
> Key: KAFKA-3632
> URL: https://issues.apache.org/jira/browse/KAFKA-3632
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: JDK 1.8, Linux
>Reporter: Brian Lueck
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> When a partition is migrated away from a broker, the ConsumerLag metric for 
> the topic/partition gets 'stuck' at the current value. The only way to remove 
> the metric is to restart the broker.
> This appears to be because in AbstractFetcherThread.scala there is no way of 
> removing a metric. See...
> {code}
> class FetcherLagStats(metricId: ClientIdAndBroker) { 
> private val valueFactory = (k: ClientIdTopicPartition) => new 
> FetcherLagMetrics(k) 
> val stats = new Pool[ClientIdTopicPartition, 
> FetcherLagMetrics](Some(valueFactory))
> def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = 
> { 
> stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, 
> partitionId)) 
> } 
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1518: KAFKA-3632; remove fetcher metrics on shutdown and...

2016-06-17 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1518

KAFKA-3632; remove fetcher metrics on shutdown and leader migration



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka port-kafka-3632-to-0.9

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1518


commit a8edbed8220941e2df06c4f333262fe40d49bb9b
Author: Jason Gustafson 
Date:   2016-05-04T19:10:41Z

KAFKA-3632; remove fetcher metrics on shutdown and leader migration

Author: Jason Gustafson 

Reviewers: Ismael Juma 

Closes #1312 from hachikuji/KAFKA-3632




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2016-06-17 Thread Dmitry Minkovsky (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336800#comment-15336800
 ] 

Dmitry Minkovsky commented on KAFKA-3862:
-

Oh hey, I somehow missed 0.10 and Confluent 3.0. Current version of docs are 
much clearer/don't have this issue. 

> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2016-06-17 Thread Dmitry Minkovsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Minkovsky resolved KAFKA-3862.
-
   Resolution: Fixed
Fix Version/s: 0.10.0.0

> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3837) Report the name of the blocking thread when throwing ConcurrentModificationException

2016-06-17 Thread Bharat Viswanadham (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336761#comment-15336761
 ] 

Bharat Viswanadham commented on KAFKA-3837:
---

Hi Simon,
I have updated the code.
You can have a look in to it, and provide your comments.

> Report the name of the blocking thread when throwing 
> ConcurrentModificationException
> 
>
> Key: KAFKA-3837
> URL: https://issues.apache.org/jira/browse/KAFKA-3837
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Simon Cooper
>Assignee: Bharat Viswanadham
>Priority: Minor
>
> {{KafkaConsumer.acquire}} throws {{ConcurrentModificationException}} if the 
> current thread it does not match the {{currentThread}} field. It would be 
> useful if the name of the other thread was included in the exception message 
> to help debug the problem when this exception occurs.
> As it stands, it can be really difficult to work out what's going wrong when 
> there's several threads all accessing the consumer at the same time, and your 
> existing exclusive access logic doesn't seem to be working as it should.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3836) KStreamReduce and KTableReduce should not pass nulls to Deserializers

2016-06-17 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3836:
-

Assignee: Jeyhun Karimov

> KStreamReduce and KTableReduce should not pass nulls to Deserializers
> -
>
> Key: KAFKA-3836
> URL: https://issues.apache.org/jira/browse/KAFKA-3836
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Avi Flax
>Assignee: Jeyhun Karimov
>Priority: Trivial
>
> As per [this 
> discussion|http://mail-archives.apache.org/mod_mbox/kafka-users/201606.mbox/%3ccahwhrru29jw4jgvhsijwbvlzb3bc6qz6pbh9tqcfbcorjk4...@mail.gmail.com%3e]
>  these classes currently pass null values along to Deserializers, so those 
> Deserializers need to handle null inputs and pass them through without 
> throwing. It would be better for these classes to simply not call the 
> Deserializers in this case; this would reduce the burden of implementers of 
> {{Deserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3838) Bump zkclient and Zookeeper versions

2016-06-17 Thread FIlipe Azevedo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

FIlipe Azevedo resolved KAFKA-3838.
---
Resolution: Fixed

> Bump zkclient and Zookeeper versions
> 
>
> Key: KAFKA-3838
> URL: https://issues.apache.org/jira/browse/KAFKA-3838
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: FIlipe Azevedo
>
> Zookeeper 3.4.8 has some improvements, specifically it handles DNS 
> Re-resolution when a connection to zookeeper fails. This potentially allows 
> Round Robin DNS without the need to hardcode the IP Addresses in the config. 
> http://zookeeper.apache.org/doc/r3.4.8/releasenotes.html
> ZkClient has a new 0.9 release which uses zookeeper 3.4.8 which is already 
> marked as stable.
> Tests are passing.
> Here is the PR: https://github.com/apache/kafka/pull/1504



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-17 Thread Rajini Sivaram
Thank you, Jun. I have removed user_principal from the KIP.

On Fri, Jun 17, 2016 at 6:00 PM, Jun Rao  wrote:

> Rajini,
>
> 10. Yes, then we can probably leave out the user_principal field and keep
> the version to be 1.
>
> Other than that, the KIP looks good to me.
>
> Thanks,
>
> Jun
>
> On Fri, Jun 17, 2016 at 3:29 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Jun,
> >
> > 10. Since entity_type "users" is new, shouldn't the JSON for these
> entities
> > have version 1? I have moved "user_principal" out of the config in the
> > samples and added to the  entries as well. But actually, do
> > we need to store the non-encoded principal at all? The node name is
> > URL-encoded user principal, so it is fairly readable if you are looking
> in
> > ZK and *kafka_configs.sh* will show the non-encoded principal by decoding
> > the name from the path (since it needs to do encoding anyway because the
> > names specified on the command line will be non-encoded principals, it
> can
> > do decoding too). Perhaps that is sufficient?
> >
> > 11. I liked the second approach since it looks neat and future-proof.
> Have
> > updated the KIP.
> >
> > 12. Yes, that is correct.
> >
> > Many thanks,
> >
> > Rajini
> >
> >
> > On Thu, Jun 16, 2016 at 11:36 PM, Jun Rao  wrote:
> >
> > > Rajini,
> > >
> > > Thanks for the update. A few more questions/comments.
> > >
> > > 10. For the quota value stored in ZK, since we are adding an optional
> > > user_principal field in the json, we should bump the version from 1 to
> 2.
> > > Also, user_principal is not really part of the config values. So,
> perhaps
> > > we should represent it as the following?
> > > {
> > > "version":2,
> > > "config": {
> > > "producer_byte_rate":"1024",
> > > "consumer_byte_rate":"2048"
> > > },
> > > "user_principal" : "user1"
> > > }
> > >
> > >  Also, we should store user_principal in the following json too, right?
> > > // Zookeeper persistence path /users//clients/clientA
> > > {
> > > "version":1,
> > > "config": {
> > > "producer_byte_rate":"10",
> > > "consumer_byte_rate":"30"
> > > }
> > > }
> > >
> > > 11. For the change notification path, would it be better to change it
> to
> > > something like the following and bump up version to 2?
> > > // Change notification for quota of 
> > > {
> > > "version":2,
> > > [
> > >   { "entity_type": "users",
> > > "entity_name": "user2"
> > >   },
> > >   { "entity_type": "clients",
> > > "entity_name": "clientA"
> > >   }
> > > ]
> > >  }
> > >
> > > Alternatively, we could change it to
> > > // Change notification for quota of 
> > > {
> > > "version":2,
> > > "entity_path": "users/user2"
> > > }
> > >
> > > {
> > > "version":2,
> > > "entity_path": "users/user2/clients/clientA"
> > > }
> > >
> > > 12. Just to clarify on the meaning of remainder quota. If you have
> quotas
> > > like the following,
> > >   = 5
> > >   = 10
> > >   = 12
> > > it means that all connections with user1 whose client-id is neither
> > client1
> > > nor client2 will be sharing a quota of 12, right? In other words, the
> > quota
> > > of  doesn't include the quota for  and  > > client2>.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Jun 16, 2016 at 5:03 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > Jun,
> > > >
> > > > Actually, with quotas stored in different nodes in ZK, it is better
> to
> > > > store remainder quota rather than total quota under /users/ so
> > that
> > > > quota calculations are not dependent on the order of notifications. I
> > > have
> > > > updated the KIP to reflect that. So the quotas in ZK now always
> reflect
> > > the
> > > > quota applied to a group of client connections and use the same
> format
> > as
> > > > client-id quotas. But it is not hierarchical, making the
> configuration
> > > > simpler.
> > > >
> > > > On Thu, Jun 16, 2016 at 11:49 AM, Rajini Sivaram <
> > > > rajinisiva...@googlemail.com> wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > Thank you for the review. I have updated the KIP:
> > > > >
> > > > >
> > > > >1. Added an overview section. Slightly reworded since it is
> better
> > > to
> > > > >treat user and client-id as different levels rather than the
> same
> > > > level.
> > > > >2. Yes, it is neater to store quota for each entity in a
> different
> > > > >path in Zookeeper. I put clients under users rather than the
> other
> > > way
> > > > >round since that reflects the hierarchy and also keeps a user's
> > > quotas
> > > > >together under a single sub-tree. I had initially used a single
> > node
> > > > to
> > > > >keep quotas and sub-quotas of a user together so that updates
> are
> > > > atomic
> > > > >since changes 

Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-17 Thread Jun Rao
Rajini,

10. Yes, then we can probably leave out the user_principal field and keep
the version to be 1.

Other than that, the KIP looks good to me.

Thanks,

Jun

On Fri, Jun 17, 2016 at 3:29 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Jun,
>
> 10. Since entity_type "users" is new, shouldn't the JSON for these entities
> have version 1? I have moved "user_principal" out of the config in the
> samples and added to the  entries as well. But actually, do
> we need to store the non-encoded principal at all? The node name is
> URL-encoded user principal, so it is fairly readable if you are looking in
> ZK and *kafka_configs.sh* will show the non-encoded principal by decoding
> the name from the path (since it needs to do encoding anyway because the
> names specified on the command line will be non-encoded principals, it can
> do decoding too). Perhaps that is sufficient?
>
> 11. I liked the second approach since it looks neat and future-proof. Have
> updated the KIP.
>
> 12. Yes, that is correct.
>
> Many thanks,
>
> Rajini
>
>
> On Thu, Jun 16, 2016 at 11:36 PM, Jun Rao  wrote:
>
> > Rajini,
> >
> > Thanks for the update. A few more questions/comments.
> >
> > 10. For the quota value stored in ZK, since we are adding an optional
> > user_principal field in the json, we should bump the version from 1 to 2.
> > Also, user_principal is not really part of the config values. So, perhaps
> > we should represent it as the following?
> > {
> > "version":2,
> > "config": {
> > "producer_byte_rate":"1024",
> > "consumer_byte_rate":"2048"
> > },
> > "user_principal" : "user1"
> > }
> >
> >  Also, we should store user_principal in the following json too, right?
> > // Zookeeper persistence path /users//clients/clientA
> > {
> > "version":1,
> > "config": {
> > "producer_byte_rate":"10",
> > "consumer_byte_rate":"30"
> > }
> > }
> >
> > 11. For the change notification path, would it be better to change it to
> > something like the following and bump up version to 2?
> > // Change notification for quota of 
> > {
> > "version":2,
> > [
> >   { "entity_type": "users",
> > "entity_name": "user2"
> >   },
> >   { "entity_type": "clients",
> > "entity_name": "clientA"
> >   }
> > ]
> >  }
> >
> > Alternatively, we could change it to
> > // Change notification for quota of 
> > {
> > "version":2,
> > "entity_path": "users/user2"
> > }
> >
> > {
> > "version":2,
> > "entity_path": "users/user2/clients/clientA"
> > }
> >
> > 12. Just to clarify on the meaning of remainder quota. If you have quotas
> > like the following,
> >   = 5
> >   = 10
> >   = 12
> > it means that all connections with user1 whose client-id is neither
> client1
> > nor client2 will be sharing a quota of 12, right? In other words, the
> quota
> > of  doesn't include the quota for  and  > client2>.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Jun 16, 2016 at 5:03 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > Jun,
> > >
> > > Actually, with quotas stored in different nodes in ZK, it is better to
> > > store remainder quota rather than total quota under /users/ so
> that
> > > quota calculations are not dependent on the order of notifications. I
> > have
> > > updated the KIP to reflect that. So the quotas in ZK now always reflect
> > the
> > > quota applied to a group of client connections and use the same format
> as
> > > client-id quotas. But it is not hierarchical, making the configuration
> > > simpler.
> > >
> > > On Thu, Jun 16, 2016 at 11:49 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > Jun,
> > > >
> > > > Thank you for the review. I have updated the KIP:
> > > >
> > > >
> > > >1. Added an overview section. Slightly reworded since it is better
> > to
> > > >treat user and client-id as different levels rather than the same
> > > level.
> > > >2. Yes, it is neater to store quota for each entity in a different
> > > >path in Zookeeper. I put clients under users rather than the other
> > way
> > > >round since that reflects the hierarchy and also keeps a user's
> > quotas
> > > >together under a single sub-tree. I had initially used a single
> node
> > > to
> > > >keep quotas and sub-quotas of a user together so that updates are
> > > atomic
> > > >since changes to sub-quotas also affect remainder quotas for other
> > > clients.
> > > >But I imagine, updates to configs are rare and it is not a big
> > issue.
> > > >3. I haven't modified the JSON for configuration change
> > notifications.
> > > >The entity_name can now be a subpath that has both user and
> client.
> > > Have
> > > >added an example to the KIP. The downside of keeping clients under
> > > users in
> > > >ZK in 2) is that the 

[jira] [Updated] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2016-06-17 Thread Dmitry Minkovsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Minkovsky updated KAFKA-3862:

Description: 
Hi. Not sure if this is the right place to post about documentation (this isn't 
actually a software bug).

I believe the following span of documentation is self-contradictory:

!http://i.imgur.com/WM8ada2.png!

In the paragraph text before and after the image, it says the partitions are 
*evenly distributed* among the threads. However, in the image, Thread 1 gets p1 
and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
That's "even" in the sense that you can't do any better, but still not even, 
and in conflict with the text that follows the image:

!http://i.imgur.com/xq3SOre.png!

This doesn't make sense because why would p2 from topicA and topicB go to 
different threads? if this were the cause, you couldn't join the partitions, 
right?

Either way, it seems like the two things are in conflict.

Finally, the wording:

bq. ...assume that each thread executes a single stream task...

What does "assume" mean here? I've not seen any indication elsewhere in the 
documentation that show you can configure this behavior. Sorry if I missed this.

Thank you!


  was:
Hi. Not sure if this is the right place to post about documentation (this isn't 
actually a software bug).

I believe the following span of documentation is self-contradictory:

!http://i.imgur.com/WM8ada2.png!

In the paragraph text before and after the image, it says the partitions are 
*evenly distributed* among the threads. However, in the image, Thread 1 gets p1 
and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
That's "even" in the sense that you can't do any better, but still not even, 
and in conflict with the text that follows the image:

!http://i.imgur.com/xq3SOre.png!

This doesn't make sense because why would p2 from topicA and topicB go to 
different threads? if this were the cause, you couldn't join the partitions, 
right?

Either way, it seems like the two things are in conflict.

Finally, the wording:

bq. ...assume that each thread executes a single stream task...

What does "assume" mean here. I've not seen any indication elsewhere in the 
documentation that show you can configure this behavior. Sorry if I missed this.

Thank you!



> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that show you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2016-06-17 Thread Dmitry Minkovsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Minkovsky updated KAFKA-3862:

Description: 
Hi. Not sure if this is the right place to post about documentation (this isn't 
actually a software bug).

I believe the following span of documentation is self-contradictory:

!http://i.imgur.com/WM8ada2.png!

In the paragraph text before and after the image, it says the partitions are 
*evenly distributed* among the threads. However, in the image, Thread 1 gets p1 
and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
That's "even" in the sense that you can't do any better, but still not even, 
and in conflict with the text that follows the image:

!http://i.imgur.com/xq3SOre.png!

This doesn't make sense because why would p2 from topicA and topicB go to 
different threads? if this were the cause, you couldn't join the partitions, 
right?

Either way, it seems like the two things are in conflict.

Finally, the wording:

bq. ...assume that each thread executes a single stream task...

What does "assume" mean here? I've not seen any indication elsewhere in the 
documentation that shows you can configure this behavior. Sorry if I missed 
this.

Thank you!


  was:
Hi. Not sure if this is the right place to post about documentation (this isn't 
actually a software bug).

I believe the following span of documentation is self-contradictory:

!http://i.imgur.com/WM8ada2.png!

In the paragraph text before and after the image, it says the partitions are 
*evenly distributed* among the threads. However, in the image, Thread 1 gets p1 
and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
That's "even" in the sense that you can't do any better, but still not even, 
and in conflict with the text that follows the image:

!http://i.imgur.com/xq3SOre.png!

This doesn't make sense because why would p2 from topicA and topicB go to 
different threads? if this were the cause, you couldn't join the partitions, 
right?

Either way, it seems like the two things are in conflict.

Finally, the wording:

bq. ...assume that each thread executes a single stream task...

What does "assume" mean here? I've not seen any indication elsewhere in the 
documentation that show you can configure this behavior. Sorry if I missed this.

Thank you!



> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2359) New consumer - partitions auto assigned only on poll

2016-06-17 Thread Mirza Gaush Beg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336331#comment-15336331
 ] 

Mirza Gaush Beg edited comment on KAFKA-2359 at 6/17/16 4:41 PM:
-

Another use case: 
i want to retrieve data based on an offset range (starting from last committed 
offset to the last message present at topic partition) using  
'KafkaUtils.createRDD' for the same consumer group.id.  I am using  high level 
consumer API from 0.9.0.1, then following below steps,
 2. Call 'subscribe'
 3. call 'partitionsFor'
4. call 'committed' - gives the last committed offset
5. call 'seekToEnd' - to retrieve the last offset  and this fails with "Caused 
by: java.lang.IllegalStateException: No current assignment for partition 
test-topic-0". exception is seen with other two methods 'seek' and 
'seekToBeginning' as well.



was (Author: gauss2402):
Another use case: 
i want to retrieve data based on an offset range (starting from last committed 
offset to the last message present at topic partition) using  
'KafkaUtils.createRDD' for the same consumer group.id.  I am using  high level 
consumer API from 0.9.0.1, then following below steps,
 2. Call 'subscribe'
 3. call 'partitionsFor'
4. call 'committed' - gives the last committed offset
5. call 'seekToEnd' - to retrieve the last offset  and this fails with "Caused 
by: java.lang.IllegalStateException: No current assignment for partition 
test-topic-0". exception is seen with other two methods 'seek' and 
'seekToBeginning'


> New consumer - partitions auto assigned only on poll
> 
>
> Key: KAFKA-2359
> URL: https://issues.apache.org/jira/browse/KAFKA-2359
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Stevo Slavic
>Priority: Minor
>
> In the new consumer I encountered unexpected behavior. After constructing 
> {{KafkaConsumer}} instance with configured consumer rebalance callback 
> handler, and subscribing to a topic with "consumer.subscribe(topic)", 
> retrieving subscriptions would return empty set and callback handler would 
> not get called (no partitions ever assigned or revoked), no matter how long 
> instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment 
> will only be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
> coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} 
> fragment in {{KafkaConsumer.subscriptions}} accessor as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2016-06-17 Thread Maysam Yabandeh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maysam Yabandeh updated KAFKA-3861:
---
Description: 
We observed a case that the leader experienced a crash and lost its in-memory 
data and latest HW offsets. Normally Kafka should be safe and be able to make 
progress with a single node failure. However a few seconds before the crash the 
leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 
and replication factor is 3 thus the troubled leader cannot accept new produce 
messages. After the crash however the controller could not name any of the of 
the followers as the new leader since as far as the controller knows they are 
not in ISR and could potentially be behind the last leader. Note that 
unclean-leader-election is disabled in this cluster since the cluster requires 
a very high degree of durability and cannot tolerate data loss.

The impact could get worse if the admin brings up the crashed broker in an 
attempt to make such partitions available again; this would take down even more 
brokers as the followers panic when they find their offset larger than HW 
offset in the leader:
{code}
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
  // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
  // This situation could only happen if the unclean election configuration 
for a topic changes while a replica is down. Otherwise,
  // we should never encounter this situation since a non-ISR leader cannot 
be elected if disallowed by the broker configuration.
  if (!LogConfig.fromProps(brokerConfig.originals, 
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss 
does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic 
%s,".format(topicAndPartition.topic) +
  " Current leader %d's latest offset %d is less than replica %d's 
latest offset %d"
  .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
replica.logEndOffset.messageOffset))
Runtime.getRuntime.halt(1)
  }
{code}

One hackish solution would be that the admin investigates the logs, determine 
that unclean-leader-election in this particular case would be safe and 
temporarily enables it (while the crashed node is down) until new leaders are 
selected for affected partitions, wait for the topics LEO advances far enough 
and then brings up the crashed node again. This manual process is however slow 
and error-prone and the cluster will suffer partial unavailability in the 
meanwhile.

We are thinking of having the controller make an exception for this case: if 
ISR size is less than min-in-sync-replicas and the new leader would be -1, then 
the controller does an RPC to all the replicas and inquire of the latest 
offset, and if all the replicas responded then chose the one with the largest 
offset as the leader as well as the new ISR. Note that the controller cannot do 
that if any of the non-leader replicas do not respond since there might be a 
case that the responding replicas have not been involved the last ISR and hence 
potentially behind the others (and the controller could not know that since it 
does not keep track of previous ISR).

Pros would be that kafka will be safely available when such cases occur and 
would not require any admin intervention. The cons however is that the 
controller talking to brokers inside the leader election function would break 
the existing pattern in the source code as currently the leader is elected 
locally without requiring any additional RPC.

Thoughts?

  was:
We observed a case that the leader experienced a crash and lost its in-memory 
data and latest HW offsets. Normally Kafka should be safe and be able to make 
progress with a single node failure. However a few seconds before the crash the 
leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 
and replication factor is 3 thus the troubled leader cannot accept new produce 
messages. After the crash however the controller could not name any of the of 
the followers as the new leader since as far as the controller knows they are 
not in ISR and could potentially be behind the last leader. Note that 
unclean-leader-election is disabled in this cluster since the cluster requires 
a very high degree of durability and cannot tolerate data loss.

The impact could get worse if the admin brings up the crashed broker in an 
attempt to make such partitions available again; this would take down even more 
brokers as the followers panic when they find their offset larger than HW 
offset in the leader:
{code}
12:31:51,526 FATAL server.ReplicaFetcherThread: 

[jira] [Updated] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2016-06-17 Thread Maysam Yabandeh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maysam Yabandeh updated KAFKA-3861:
---
Description: 
We observed a case that the leader experienced a crash and lost its in-memory 
data and latest HW offsets. Normally Kafka should be safe and be able to make 
progress with a single node failure. However a few seconds before the crash the 
leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 
and replication factor is 3 thus the troubled leader cannot accept new produce 
messages. After the crash however the controller could not name any of the of 
the followers as the new leader since as far as the controller knows they are 
not in ISR and could potentially be behind the last leader. Note that 
unclean-leader-election is disabled in this cluster since the cluster requires 
a very high degree of durability and cannot tolerate data loss.

The impact could get worse if the admin brings up the crashed broker in an 
attempt to make such partitions available again; this would take down even more 
brokers as the followers panic when they find their offset larger than HW 
offset in the leader:
{code}
12:31:51,526 FATAL server.ReplicaFetcherThread: [ReplicaFetcherThread-3-8], 
Halting because log truncation is not allowed for topic topic-xyz, Current 
leader 8's latest offset 112599873 is less than replica 9's latest offset 
11269
{code}

One hackish solution would be that the admin investigates the logs, determine 
that unclean-leader-election in this particular case would be safe and 
temporarily enables it (while the crashed node is down) until new leaders are 
selected for affected partitions, wait for the topics LEO advances far enough 
and then brings up the crashed node again. This manual process is however slow 
and error-prone and the cluster will suffer partial unavailability in the 
meanwhile.

We are thinking of having the controller make an exception for this case: if 
ISR size is less than min-in-sync-replicas and the new leader would be -1, then 
the controller does an RPC to all the replicas and inquire of the latest 
offset, and if all the replicas responded then chose the one with the largest 
offset as the leader as well as the new ISR. Note that the controller cannot do 
that if any of the non-leader replicas do not respond since there might be a 
case that the responding replicas have not been involved the last ISR and hence 
potentially behind the others (and the controller could not know that since it 
does not keep track of previous ISR).

Pros would be that kafka will be safely available when such cases occur and 
would not require any admin intervention. The cons however is that the 
controller talking to brokers inside the leader election function would break 
the existing pattern in the source code as currently the leader is elected 
locally without requiring any additional RPC.

Thoughts?

  was:
We observed a case that the leader experienced a crash and lost its in-memory 
data and latest HW offsets. Normally Kafka should be safe and be able to make 
progress with a single node failure. However a few seconds before the crash the 
leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 
and replication factor is 3 thus the troubled leader cannot accept new produce 
messages. After the crash however the controller could not name any of the of 
the followers as the new leader since as far as the controller knows they are 
not in ISR and could potentially be behind the last leader. Note that 
unclean-leader-election is disabled in this cluster since the cluster requires 
a very high degree of durability and cannot tolerate data loss.

One hackish solution would be that the admin investigates the logs, determine 
that unclean-leader-election in this particular case would be safe and 
temporarily enables it until new leaders are selected for affected partitions. 
This manual process is however slow and error-prone and the cluster will suffer 
partial unavailability in the meanwhile.

We are thinking of having the controller make an exception for this case: if 
ISR size is less than min-in-sync-replicas and the new leader would be -1, then 
the controller does an RPC to all the replicas and inquire of the latest 
offset, and if all the replicas responded then chose the one with the largest 
offset as the leader as well as the new ISR. Note that the controller cannot do 
that if any of the non-leader replicas do not respond since there might be a 
case that the responding replicas have not been involved the last ISR and hence 
potentially behind the others (and the controller could not know that since it 
does not keep track of previous ISR).

Pros would be that kafka will be safely available when such cases occur and 
would not require any admin intervention. The cons however is that the 
controller talking to brokers inside 

[jira] [Created] (KAFKA-3861) Shrunk ISR before leader crash makes the partition unavailable

2016-06-17 Thread Maysam Yabandeh (JIRA)
Maysam Yabandeh created KAFKA-3861:
--

 Summary: Shrunk ISR before leader crash makes the partition 
unavailable
 Key: KAFKA-3861
 URL: https://issues.apache.org/jira/browse/KAFKA-3861
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0
Reporter: Maysam Yabandeh


We observed a case that the leader experienced a crash and lost its in-memory 
data and latest HW offsets. Normally Kafka should be safe and be able to make 
progress with a single node failure. However a few seconds before the crash the 
leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 
and replication factor is 3 thus the troubled leader cannot accept new produce 
messages. After the crash however the controller could not name any of the of 
the followers as the new leader since as far as the controller knows they are 
not in ISR and could potentially be behind the last leader. Note that 
unclean-leader-election is disabled in this cluster since the cluster requires 
a very high degree of durability and cannot tolerate data loss.

One hackish solution would be that the admin investigates the logs, determine 
that unclean-leader-election in this particular case would be safe and 
temporarily enables it until new leaders are selected for affected partitions. 
This manual process is however slow and error-prone and the cluster will suffer 
partial unavailability in the meanwhile.

We are thinking of having the controller make an exception for this case: if 
ISR size is less than min-in-sync-replicas and the new leader would be -1, then 
the controller does an RPC to all the replicas and inquire of the latest 
offset, and if all the replicas responded then chose the one with the largest 
offset as the leader as well as the new ISR. Note that the controller cannot do 
that if any of the non-leader replicas do not respond since there might be a 
case that the responding replicas have not been involved the last ISR and hence 
potentially behind the others (and the controller could not know that since it 
does not keep track of previous ISR).

Pros would be that kafka will be safely available when such cases occur and 
would not require any admin intervention. The cons however is that the 
controller talking to brokers inside the leader election function would break 
the existing pattern in the source code as currently the leader is elected 
locally without requiring any additional RPC.

Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-17 Thread Dana Powers
I'm unconvinced (crazy, right?). Comments below:

On Fri, Jun 17, 2016 at 7:27 AM, Grant Henke  wrote:
> Hi Dana,
>
> You mentioned one of the reasons I error and disconnect. Because I can't
> return an error for every request so the cardinality between request and
> response would be different. Beyond that though, I am handling this
> protocol rule/parsing error the same way all other messages do.

But you can return an error for every topic, and isn't that the level
of error required here?

> CreateTopic Response (Version: 0) => [topic_error_codes]
>   topic_error_codes => topic error_code
> topic => STRING
> error_code => INT16

If I submit duplicate requests for a topic, it's an error isolated to
that topic. If I mess up the partition / replication / etc semantics
for a topic, that's an error isolated to that topic. Is there a
cardinality problem at this level?


>
> Parsing is handled in the RequestChannel and any exception that occurs
> during this phase is caught, converted into an InvalidRequestException and
> results in a disconnect:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95
>
> Since this is an error that could only occur (and would always occur) due
> to incorrect client implementations, and not because of any cluster state
> or unusual situation, I felt this behavior was okay and made sense. For
> client developers the broker logging should make it obvious what the issue
> is. My patch also clearly documents the protocol rules in the Protocol
> definition.

Documentation is great and definitely a must. But requiring client
developers to dig through server logs is not ideal. Client developers
don't always have direct access to those logs. And even if they do,
the brokers may have other traffic, which makes it difficult to track
down the exact point in the logs where the error occurred.

As discussed above, I don't think you need to or should model this as
a request-level parsing error. It may be easier for the current broker
implementation to do that and just crash the connection, but I don't
think it makes that much sense from a raw api perspective.

> In the future having a response header with an error code (and optimally
> error message) for every response would help solve this problem (for all
> message types).

That will definitely help solve the more general invalid request error
problem. But I think given the current state of error handling /
feedback from brokers on request-level errors, you should treat
connection crash as a last resort. I think there is a good opportunity
to avoid it in this case, and I think the api would be better if done
that way.

-Dana

> On Fri, Jun 17, 2016 at 12:04 AM, Dana Powers  wrote:
>
>> Why disconnect the client on a InvalidRequestException? The 2 errors
>> you are catching are both topic-level: (1) multiple requests for the
>> same topic, and (2) ReplicaAssignment and num_partitions /
>> replication_factor both set. Wouldn't it be better to just error the
>> offending create_topic_request, not the entire connection? The
>> CreateTopicsResponse returns a map of topics to error codes. You could
>> just return the topic that caused the error and an
>> InvalidRequestException error code.
>>
>> -Dana
>>
>> On Thu, Jun 16, 2016 at 8:37 AM, Grant Henke  wrote:
>> > I have updated the wiki and pull request based on the feedback. If there
>> > are no objections I will start a vote at the end of the day.
>> >
>> > Details for this implementation can be read here:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
>> >
>> > The updated pull request can be found here (feel free to review):
>> > https://github.com/apache/kafka/pull/1489
>> >
>> > Below is the exact content for clarity:
>> >
>> >> Create Topics Request (KAFKA-2945
>> >> )
>> >>
>> >>
>> >>
>> >> CreateTopics Request (Version: 0) => [create_topic_requests] timeout
>> >>   create_topic_requests => topic num_partitions replication_factor
>> [replica_assignment] [configs]
>> >> topic => STRING
>> >> num_partitions => INT32
>> >> replication_factor => INT16
>> >> replica_assignment => partition_id [replicas]
>> >>   partition_id => INT32
>> >>   replicas => INT32
>> >> configs => config_key config_value
>> >>   config_key => STRING
>> >>   config_value => STRING
>> >>   timeout => INT32
>> >>
>> >> CreateTopicsRequest is a batch request to initiate topic creation with
>> >> either predefined or automatic replica assignment and optionally topic
>> >> configuration.
>> >>
>> >> Request semantics:
>> >>
>> >>1. Must be sent to the controller broker
>> >>2. If there are multiple instructions for the same topic in one
>> >>request an 

[jira] [Commented] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-17 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336310#comment-15336310
 ] 

Jeyhun Karimov commented on KAFKA-3839:
---

It seems that groupby operator is gone from KTable. Nevertheless, I checked its 
behaviour with null keys:

  KTable source = builder.table(longSerde, stringSerde, 
"topic1");
  KTable counts =  source.
  groupBy(new KeyValueMapper>() {

@Override
public KeyValue apply(Long key, String value) {
// TODO Auto-generated method stub
 return  KeyValue.pair(null, value);
}
},Serdes.String(), Serdes.String()).count("count");
  counts.to(stringSerde,longSerde,"topic2");

If I run this code, no exception/error is got and nothing is outputted to topic 
(as expected). I check the class KTableRepartitionMap.KTableMapProcessor and I 
think the required null checks are there in process() method.





> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Marcus Gründler
-1
Hi Ismael,

Although I really like the Java 8 features and understand the advantages you
mentioned about Java 8 migration, I would suggest to stay with Java 7 as
a minimum requirement for a while.

I think there are two aspects to consider - Kafka Server and Kafka clients. On 
the server part it would make sense to switch to Java 8 because you can run
the broker independently from any enclosing runtime (no JEE server etc.)

But if you change the requirement for Kafka clients, you would cut Kafka
support for quite a lot of real world deployments that run for example on
an IBM WebSphere JEE Server (*sigh*). Since up to today there is no 
WebSphere version that supports Java 8.

And I think a split of Kafka server with Java8 and Kafka client JARs in Java7 
would be too complicated to maintain.

So my conclusion is - stay with Java 7 for a while.

Regards, Marcus


> Am 16.06.2016 um 22:45 schrieb Ismael Juma :
> 
> Hi all,
> 
> I would like to start a discussion on making Java 8 a minimum requirement
> for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
> is the first discussion on the topic so the idea is to understand how
> people feel about it. If people feel it's too soon, then we can pick up the
> conversation again after Kafka 0.10.1.0. If the feedback is mostly
> positive, I will start a vote thread.
> 
> Let's start with some dates. Java 7 hasn't received public updates since
> April 2015[1], Java 8 was released in March 2014[2] and Java 9 is scheduled
> to be released in March 2017[3].
> 
> The first argument for dropping support for Java 7 is that the last public
> release by Oracle contains a large number of known security
> vulnerabilities. The effectiveness of Kafka's security features is reduced
> if the underlying runtime is not itself secure.
> 
> The second argument for moving to Java 8 is that it adds a number of
> compelling features:
> 
> * Lambda expressions and method references (particularly useful for the
> Kafka Streams DSL)
> * Default methods (very useful for maintaining compatibility when adding
> methods to interfaces)
> * java.util.stream (helpful for making collection transformations more
> concise)
> * Lots of improvements to java.util.concurrent (CompletableFuture,
> DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
> * Other nice things: SplittableRandom, Optional (and many others I have not
> mentioned)
> 
> The third argument is that it will simplify our testing matrix, we won't
> have to test with Java 7 any longer (this is particularly useful for system
> tests that take hours to run). It will also make it easier to support Scala
> 2.12, which requires Java 8.
> 
> The fourth argument is that many other open-source projects have taken the
> leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
> Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
> support Java 8 in the next version (although it will take a while before
> most phones will use that version sadly). This reduces (but does not
> eliminate) the chance that we would be the first project that would cause a
> user to consider a Java upgrade.
> 
> The main argument for not making the change is that a reasonable number of
> users may still be using Java 7 by the time Kafka 0.10.1.0 is released.
> More specifically, we care about the subset who would be able to upgrade to
> Kafka 0.10.1.0, but would not be able to upgrade the Java version. It would
> be great if we could quantify this in some way.
> 
> What do you think?
> 
> Ismael
> 
> [1] https://java.com/en/download/faq/java_7.xml
> [2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released
> [3] http://openjdk.java.net/projects/jdk9/
> [4] https://github.com/apache/cassandra/blob/trunk/README.asc
> [5] https://lucene.apache.org/#highlights-of-this-lucene-release-include
> [6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html
> [7] https://issues.apache.org/jira/browse/HADOOP-11858
> [8] https://webtide.com/jetty-9-3-features/
> [9] http://markmail.org/message/l7s276y3xkga2eqf
> [10]
> https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under
> [11] http://markmail.org/message/l7s276y3xkga2eqf

-- 

aixigo AG - einfach. besser. beraten
Karl-Friedrich-Straße 68, 52072 Aachen, Germany
fon: +49 (0)241 559709-43, fax: +49 (0)241 559709-99
eMail: marcus.gruend...@aixigo.de, web: http://www.aixigo.de

Amtsgericht Aachen - HRB 8057
Vorstand: Erich Borsch, Christian Friedrich, Tobias Haustein
Vors. des Aufsichtsrates: Prof. Dr. Rüdiger von Nitzsch



[jira] [Commented] (KAFKA-3487) Support per-connector/per-task classloaders in Connect

2016-06-17 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336254#comment-15336254
 ] 

Randall Hauch commented on KAFKA-3487:
--

I hope this can move toward an approach that can eventually support the Jigsaw 
module system that appears to be coming in JDK9. Jigsaw relies upon the 
standard Java service loader mechanism (added in JDK6) to find all 
implementations of an interface, and Jigsaw will then properly handle all 
dependencies and class loading.

A pre-JDK9 approach is to use a configuration parameter for the workers (e.g., 
a connector module path) that simply lists the directories in which Kafka 
Connect can find each connector "module". The connector module's directory 
would contain all of the JARs required by the connector. Upon startup, Kafka 
Connect could iterate through this list of paths, and for each: create a URL 
classloader (that inherits the parent classpath), pass the URL classloader to 
the service loader method to load the connector implementation class (without 
having to know its name), create a {{ConnectorModule}} object with the URL 
classloader and reference to the connector class, and then load the 
{{ConnectorModule}} into an internal registry keyed by name. Then, the rest of 
Kafka Connect would simply use the registry.

Really, this same registry could also be used to find all implementations 
available on the current classloader, meaning it would work with connectors 
that don't define a 
{{META-INF/services/org.apache.kafka.connect.connector.Connector}} file for the 
service loader.

And, to add support for JDK9, upon startup Kafka Connect would simply use the 
service loader to locate all {{Connector}} implementation classes, and populate 
the same registry, using a different {{ConnectorModule}} implementation that 
simply instantiates the class and relying upon JDK9 modules to properly handle 
all of the classloading and isolation. The rest of the Kafka Connect remains 
unchanged.

The only change to connector implementations is to add support for the service 
loader, and since that's going to be required by JDK9 it might be worth doing 
now. And, doing this now would greatly simply the implementation.


> Support per-connector/per-task classloaders in Connect
> --
>
> Key: KAFKA-3487
> URL: https://issues.apache.org/jira/browse/KAFKA-3487
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Liquan Pei
>Priority: Critical
>  Labels: needs-kip
>
> Currently we just use the default ClassLoader in Connect. However, this 
> limits how we can compatibly load conflicting connector plugins. Ideally we 
> would use a separate class loader per connector/task that is instantiated to 
> avoid potential conflicts.
> Note that this also opens up options for other ways to provide jars to 
> instantiate connectors. For example, Spark uses this to dynamically publish 
> classes defined in the REPL and load them via URL: 
> https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ But 
> much simpler examples (include URL in the connector class instead of just 
> class name) are also possible and could be a nice way to more support dynamic 
> sets of connectors, multiple versions of the same connector, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-17 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336241#comment-15336241
 ] 

Maysam Yabandeh commented on KAFKA-3693:


[~junrao] I am thinking of a panic reaction to the first, non-full 
LeaderAndIsrRequests, i.e., broker shows a fatal error and halts. This would be 
a kind of guard against the unforeseen corner cases that would result in the 
broker deleting its replica of some topics. Correct me if I am wrong but my 
understanding is that the cases of non-full UpdateMetadataRequest (or them not 
being received before LeaderAndIsrRequests) are much less harmful: the worst 
case is that the broker cannot connect to some other brokers; this would not 
jeopardize safety and would only affect availability, which is expected to 
trigger an admin investigation to resolve the issue.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-17 Thread Grant Henke
Hi Dana,

You mentioned one of the reasons I error and disconnect. Because I can't
return an error for every request so the cardinality between request and
response would be different. Beyond that though, I am handling this
protocol rule/parsing error the same way all other messages do.

Parsing is handled in the RequestChannel and any exception that occurs
during this phase is caught, converted into an InvalidRequestException and
results in a disconnect:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L92-L95

Since this is an error that could only occur (and would always occur) due
to incorrect client implementations, and not because of any cluster state
or unusual situation, I felt this behavior was okay and made sense. For
client developers the broker logging should make it obvious what the issue
is. My patch also clearly documents the protocol rules in the Protocol
definition.

In the future having a response header with an error code (and optimally
error message) for every response would help solve this problem (for all
message types).

Thanks,
Grant


On Fri, Jun 17, 2016 at 12:04 AM, Dana Powers  wrote:

> Why disconnect the client on a InvalidRequestException? The 2 errors
> you are catching are both topic-level: (1) multiple requests for the
> same topic, and (2) ReplicaAssignment and num_partitions /
> replication_factor both set. Wouldn't it be better to just error the
> offending create_topic_request, not the entire connection? The
> CreateTopicsResponse returns a map of topics to error codes. You could
> just return the topic that caused the error and an
> InvalidRequestException error code.
>
> -Dana
>
> On Thu, Jun 16, 2016 at 8:37 AM, Grant Henke  wrote:
> > I have updated the wiki and pull request based on the feedback. If there
> > are no objections I will start a vote at the end of the day.
> >
> > Details for this implementation can be read here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
> >
> > The updated pull request can be found here (feel free to review):
> > https://github.com/apache/kafka/pull/1489
> >
> > Below is the exact content for clarity:
> >
> >> Create Topics Request (KAFKA-2945
> >> )
> >>
> >>
> >>
> >> CreateTopics Request (Version: 0) => [create_topic_requests] timeout
> >>   create_topic_requests => topic num_partitions replication_factor
> [replica_assignment] [configs]
> >> topic => STRING
> >> num_partitions => INT32
> >> replication_factor => INT16
> >> replica_assignment => partition_id [replicas]
> >>   partition_id => INT32
> >>   replicas => INT32
> >> configs => config_key config_value
> >>   config_key => STRING
> >>   config_value => STRING
> >>   timeout => INT32
> >>
> >> CreateTopicsRequest is a batch request to initiate topic creation with
> >> either predefined or automatic replica assignment and optionally topic
> >> configuration.
> >>
> >> Request semantics:
> >>
> >>1. Must be sent to the controller broker
> >>2. If there are multiple instructions for the same topic in one
> >>request an InvalidRequestException will be logged on the broker and
> the
> >>client will be disconnected.
> >>   - This is because the list of topics is modeled server side as a
> >>   map with TopicName as the key
> >>3. The principal must be authorized to the "Create" Operation on the
> >>"Cluster" resource to create topics.
> >>   - Unauthorized requests will receive a
> ClusterAuthorizationException
> >>4.
> >>
> >>Only one from ReplicaAssignment or (num_partitions +
> replication_factor
> >>), can be defined in one instruction.
> >>- If both parameters are specified an InvalidRequestException will be
> >>   logged on the broker and the client will be disconnected.
> >>   - In the case ReplicaAssignment is defined number of partitions
> and
> >>   replicas will be calculated from the supplied replica_assignment.
> >>   - In the case of defined (num_partitions + replication_factor)
> >>   replica assignment will be automatically generated by the server.
> >>   - One or the other must be defined. The existing broker side auto
> >>   create defaults will not be used
> >>   (default.replication.factor, num.partitions). The client
> implementation can
> >>   have defaults for these options when generating the messages.
> >>   - The first replica in [replicas] is assumed to be the preferred
> >>   leader. This matches current behavior elsewhere.
> >>5. Setting a timeout > 0 will allow the request to block until the
> >>topic metadata is "complete" on the controller node.
> >>   - Complete means the local topic metadata cache been completely

Re: Embedding zookeeper and kafka in java process.

2016-06-17 Thread Achanta Vamsi Subhash
If you are using it for tests, this works with Kafka 10 (tune broker
configs as per your req)

public class TestKafkaCluster {
final KafkaServer kafkaServer;
final ZkClient zkClient;
private String zkConnectionString;

public TestKafkaCluster(String zkConnectionString, int kafkaPort)
throws Exception {
this.zkConnectionString = zkConnectionString;
zkClient = new ZkClient(zkConnectionString,
3, 3, ZKStringSerializer$.MODULE$);
//zkClient.createPersistent("/flo/kafka", true);
final KafkaConfig config = getKafkaConfig(zkConnectionString,
kafkaPort);
config.port();
final Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
}

private static KafkaConfig getKafkaConfig(final String
zkConnectString, int port) {
final Properties brokerConfig = TestUtils.createBrokerConfig(1,
zkConnectString,
false,
false,
port,
Option.apply(SecurityProtocol.PLAINTEXT),
Option.empty(),
Option.empty(),
false,
false,
0,
false,
0,
false,
0,
Option.empty());
brokerConfig.put("default.replication.factor", String.valueOf(1));
return new KafkaConfig(brokerConfig);
}

public KafkaServer getKafkaServer() {
return kafkaServer;
}

public void stop() throws IOException {
kafkaServer.shutdown();
zkClient.close();
}

public String getZkConnectionString() {
return zkConnectionString;
}
}


On Fri, Jun 17, 2016 at 2:18 AM, Ismael Juma  wrote:

> Try using kafka.server.KafkaServerStartable instead. It should do the right
> thing.
>
> Ismael
>
> On Thu, Jun 16, 2016 at 7:18 PM, Subhash Agrawal 
> wrote:
>
> > Thanks Ismael.
> > I am instantiating kafkaserver instance like this.
> > new KafkaServer(kafkaConfig,null,null);
> >
> > I tried to use
> > new KafkaServer(kafkaConfig); but it does not compile with kafka 0.10.0.
> >
> > All the example I see uses
> > new KafkaServer(kafkaConfig);
> >
> > Do we support new KafkaServer(kafkaConfig);   with kafka 0.10.0? if not,
> > how can I pass
> > these parameters? It used to work with kafka 0.7.1.
> >
> > Thanks
> > Subhash Agrawal
> >
> >
> > -Original Message-
> > From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> > Juma
> > Sent: Thursday, June 16, 2016 1:38 AM
> > To: dev@kafka.apache.org
> > Subject: Re: Embedding zookeeper and kafka in java process.
> >
> > Hi Subhash,
> >
> > This would happen if `null` is passed as the `threadNamePrefix` argument
> > when instantiating `KafkaServer`:
> >
> > class KafkaServer(val config: KafkaConfig, time: Time = SystemTime,
> > threadNamePrefix: Option[String] = None) extends Logging with
> > KafkaMetricsGroup
> >
> > How are you starting Kafka in your Java process?
> >
> > Ismael
> >
> > On Thu, Jun 16, 2016 at 1:26 AM, Subhash Agrawal 
> > wrote:
> >
> > > Thanks for quick response.
> > > I started zookeeper via zookeeper-server-start.bat and started kafka
> via
> > > my java process and I saw same error.
> > > But if I start zookeeper via java process and start kafka via
> > > kafka-server-start.bat, t works fine.
> > > It means it is not caused due to both getting started in same process.
> It
> > > must be some kafka specific issue.
> > >
> > > Subhash Agrawal
> > >
> > > -Original Message-
> > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > Sent: Wednesday, June 15, 2016 3:42 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: Embedding zookeeper and kafka in java process.
> > >
> > > It seems "scala.MatchError: null" are not related to the settings that
> ZK
> > > and Kafka is embedded in the same process, and the only case that I can
> > > think of related is this:
> > https://issues.apache.org/jira/browse/KAFKA-940.
> > >
> > > Could you clarify if you start these two services on two processes, the
> > > issue goes away?
> > >
> > > Guozhang
> > >
> > > On Wed, Jun 15, 2016 at 1:56 PM, Subhash Agrawal <
> agraw...@opentext.com>
> > > wrote:
> > >
> > > > Hi All,
> > > > I am embedding Kafka 0.10.0 and corresponding zookeeper in java
> > process.
> > > > In this process, I start zookeeper first and then wait for 10 seconds
> > and
> > > > then start kafka. These are all running in the same process. Toward
> the
> > > > end of kafka startup, I see following exception. It seems zookeeper
> is
> > > not
> > > > able
> > > > to add the newly created kafka instance. Have you seen this error
> > > > earlier?  I have only single node kafka.
> > > >
> > > > Let me know if you have any suggestions. I will really appreciate any
> > > help
> > > > on this.
> > > >
> > > > Thanks
> > > > Subhash Agrawal.
> > > >

Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Ismael Juma
Hi Marcus,

Thanks for your feedback.

With regards to IBM WebSphere, the latest stable release (8.5.5) supports
Java 8 according to the documentation:

http://www-01.ibm.com/support/docview.wss?uid=swg27005002

Having said that, it is fair to discuss servers and clients separately. In
Kafka, you can't use newer clients with older brokers, but you can use
older clients with newer brokers. As such, the scenario we're talking about
is that of users who can upgrade their brokers and clients to the latest
Kafka version, but are stuck with an older version of WebSphere, right? Are
you aware of such users?

Ismael
On Fri, Jun 17, 2016 at 10:34 AM, Marcus Gründler <
marcus.gruend...@aixigo.de> wrote:

> -1
> Hi Ismael,
>
> Although I really like the Java 8 features and understand the advantages
> you
> mentioned about Java 8 migration, I would suggest to stay with Java 7 as
> a minimum requirement for a while.
>
> I think there are two aspects to consider - Kafka Server and Kafka
> clients. On
> the server part it would make sense to switch to Java 8 because you can run
> the broker independently from any enclosing runtime (no JEE server etc.)
>
> But if you change the requirement for Kafka clients, you would cut Kafka
> support for quite a lot of real world deployments that run for example on
> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no
> WebSphere version that supports Java 8.
>
> And I think a split of Kafka server with Java8 and Kafka client JARs in
> Java7
> would be too complicated to maintain.
>
> So my conclusion is - stay with Java 7 for a while.
>
> Regards, Marcus
>
>
> > Am 16.06.2016 um 22:45 schrieb Ismael Juma :
> >
> > Hi all,
> >
> > I would like to start a discussion on making Java 8 a minimum requirement
> > for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
> > is the first discussion on the topic so the idea is to understand how
> > people feel about it. If people feel it's too soon, then we can pick up
> the
> > conversation again after Kafka 0.10.1.0. If the feedback is mostly
> > positive, I will start a vote thread.
> >
> > Let's start with some dates. Java 7 hasn't received public updates since
> > April 2015[1], Java 8 was released in March 2014[2] and Java 9 is
> scheduled
> > to be released in March 2017[3].
> >
> > The first argument for dropping support for Java 7 is that the last
> public
> > release by Oracle contains a large number of known security
> > vulnerabilities. The effectiveness of Kafka's security features is
> reduced
> > if the underlying runtime is not itself secure.
> >
> > The second argument for moving to Java 8 is that it adds a number of
> > compelling features:
> >
> > * Lambda expressions and method references (particularly useful for the
> > Kafka Streams DSL)
> > * Default methods (very useful for maintaining compatibility when adding
> > methods to interfaces)
> > * java.util.stream (helpful for making collection transformations more
> > concise)
> > * Lots of improvements to java.util.concurrent (CompletableFuture,
> > DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
> > * Other nice things: SplittableRandom, Optional (and many others I have
> not
> > mentioned)
> >
> > The third argument is that it will simplify our testing matrix, we won't
> > have to test with Java 7 any longer (this is particularly useful for
> system
> > tests that take hours to run). It will also make it easier to support
> Scala
> > 2.12, which requires Java 8.
> >
> > The fourth argument is that many other open-source projects have taken
> the
> > leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
> > Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
> > support Java 8 in the next version (although it will take a while before
> > most phones will use that version sadly). This reduces (but does not
> > eliminate) the chance that we would be the first project that would
> cause a
> > user to consider a Java upgrade.
> >
> > The main argument for not making the change is that a reasonable number
> of
> > users may still be using Java 7 by the time Kafka 0.10.1.0 is released.
> > More specifically, we care about the subset who would be able to upgrade
> to
> > Kafka 0.10.1.0, but would not be able to upgrade the Java version. It
> would
> > be great if we could quantify this in some way.
> >
> > What do you think?
> >
> > Ismael
> >
> > [1] https://java.com/en/download/faq/java_7.xml
> > [2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released
> > [3] http://openjdk.java.net/projects/jdk9/
> > [4] https://github.com/apache/cassandra/blob/trunk/README.asc
> > [5] https://lucene.apache.org/#highlights-of-this-lucene-release-include
> > [6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html
> > [7] https://issues.apache.org/jira/browse/HADOOP-11858
> > [8] https://webtide.com/jetty-9-3-features/
> > [9] 

Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Jeff Klukas
On Thu, Jun 16, 2016 at 5:20 PM, Ismael Juma  wrote:

> On Thu, Jun 16, 2016 at 11:13 PM, Stephen Boesch 
> wrote:
>
> > @Jeff Klukas What is the concern about scala 2.11 vs 2.12?   2.11 runs on
> > both java7 and java8
> >
>
> Scala 2.10.5 and 2.10.6 also support Java 8 for what it's worth.
>

I was under the impression that Scala 2.12 would be the first version
compatible with Java 8 bytecode, but looks like that was a misunderstanding
on my part.

+1


[jira] [Assigned] (KAFKA-3185) Allow users to cleanup internal data

2016-06-17 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-3185:
--

Assignee: Matthias J. Sax

> Allow users to cleanup internal data
> 
>
> Key: KAFKA-3185
> URL: https://issues.apache.org/jira/browse/KAFKA-3185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Currently the internal data is managed completely by Kafka Streams framework 
> and users cannot clean them up actively. This results in a bad out-of-the-box 
> user experience especially for running demo programs since it results 
> internal data (changelog topics, RocksDB files, etc) that need to be cleaned 
> manually. It will be better to add a
> {code}
> KafkaStreams.cleanup()
> {code}
> function call to clean up these internal data programmatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-17 Thread Manikumar Reddy
+1 (non-binding)

On Fri, Jun 17, 2016 at 3:37 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> +1 (non-binding)
>
> On Fri, Jun 17, 2016 at 4:45 AM, Grant Henke  wrote:
>
> > +1
> >
> > On Thu, Jun 16, 2016 at 8:50 PM, tao xiao  wrote:
> >
> > > +1
> > >
> > > On Fri, 17 Jun 2016 at 09:03 Harsha  wrote:
> > >
> > > > +1 (binding)
> > > > Thanks,
> > > > Harsha
> > > >
> > > > On Thu, Jun 16, 2016, at 05:46 PM, Henry Cai wrote:
> > > > > +1
> > > > >
> > > > > On Thu, Jun 16, 2016 at 3:46 PM, Ismael Juma 
> > > wrote:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Fri, Jun 17, 2016 at 12:44 AM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1.
> > > > > > >
> > > > > > > On Thu, Jun 16, 2016 at 11:44 AM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I'd like to open the vote for KIP-62. This proposal attempts
> to
> > > > address
> > > > > > > one
> > > > > > > > of the recurring usability problems that users of the new
> > > consumer
> > > > have
> > > > > > > > faced with as little impact as possible. You can read the
> full
> > > > details
> > > > > > > > here:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > > > > > .
> > > > > > > >
> > > > > > > > After some discussion on this list, I think we were in
> > agreement
> > > > that
> > > > > > > this
> > > > > > > > change addresses a major part of the problem and we've left
> the
> > > > door
> > > > > > open
> > > > > > > > for further improvements, such as adding a heartbeat() API
> or a
> > > > > > > separately
> > > > > > > > configured rebalance timeout. Thanks in advance to everyone
> who
> > > > helped
> > > > > > > > review the proposal.
> > > > > > > >
> > > > > > > > -Jason
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >
>
>
>
> --
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-17 Thread Rajini Sivaram
Jun,

10. Since entity_type "users" is new, shouldn't the JSON for these entities
have version 1? I have moved "user_principal" out of the config in the
samples and added to the  entries as well. But actually, do
we need to store the non-encoded principal at all? The node name is
URL-encoded user principal, so it is fairly readable if you are looking in
ZK and *kafka_configs.sh* will show the non-encoded principal by decoding
the name from the path (since it needs to do encoding anyway because the
names specified on the command line will be non-encoded principals, it can
do decoding too). Perhaps that is sufficient?

11. I liked the second approach since it looks neat and future-proof. Have
updated the KIP.

12. Yes, that is correct.

Many thanks,

Rajini


On Thu, Jun 16, 2016 at 11:36 PM, Jun Rao  wrote:

> Rajini,
>
> Thanks for the update. A few more questions/comments.
>
> 10. For the quota value stored in ZK, since we are adding an optional
> user_principal field in the json, we should bump the version from 1 to 2.
> Also, user_principal is not really part of the config values. So, perhaps
> we should represent it as the following?
> {
> "version":2,
> "config": {
> "producer_byte_rate":"1024",
> "consumer_byte_rate":"2048"
> },
> "user_principal" : "user1"
> }
>
>  Also, we should store user_principal in the following json too, right?
> // Zookeeper persistence path /users//clients/clientA
> {
> "version":1,
> "config": {
> "producer_byte_rate":"10",
> "consumer_byte_rate":"30"
> }
> }
>
> 11. For the change notification path, would it be better to change it to
> something like the following and bump up version to 2?
> // Change notification for quota of 
> {
> "version":2,
> [
>   { "entity_type": "users",
> "entity_name": "user2"
>   },
>   { "entity_type": "clients",
> "entity_name": "clientA"
>   }
> ]
>  }
>
> Alternatively, we could change it to
> // Change notification for quota of 
> {
> "version":2,
> "entity_path": "users/user2"
> }
>
> {
> "version":2,
> "entity_path": "users/user2/clients/clientA"
> }
>
> 12. Just to clarify on the meaning of remainder quota. If you have quotas
> like the following,
>   = 5
>   = 10
>   = 12
> it means that all connections with user1 whose client-id is neither client1
> nor client2 will be sharing a quota of 12, right? In other words, the quota
> of  doesn't include the quota for  and  client2>.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jun 16, 2016 at 5:03 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Jun,
> >
> > Actually, with quotas stored in different nodes in ZK, it is better to
> > store remainder quota rather than total quota under /users/ so that
> > quota calculations are not dependent on the order of notifications. I
> have
> > updated the KIP to reflect that. So the quotas in ZK now always reflect
> the
> > quota applied to a group of client connections and use the same format as
> > client-id quotas. But it is not hierarchical, making the configuration
> > simpler.
> >
> > On Thu, Jun 16, 2016 at 11:49 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > Jun,
> > >
> > > Thank you for the review. I have updated the KIP:
> > >
> > >
> > >1. Added an overview section. Slightly reworded since it is better
> to
> > >treat user and client-id as different levels rather than the same
> > level.
> > >2. Yes, it is neater to store quota for each entity in a different
> > >path in Zookeeper. I put clients under users rather than the other
> way
> > >round since that reflects the hierarchy and also keeps a user's
> quotas
> > >together under a single sub-tree. I had initially used a single node
> > to
> > >keep quotas and sub-quotas of a user together so that updates are
> > atomic
> > >since changes to sub-quotas also affect remainder quotas for other
> > clients.
> > >But I imagine, updates to configs are rare and it is not a big
> issue.
> > >3. I haven't modified the JSON for configuration change
> notifications.
> > >The entity_name can now be a subpath that has both user and client.
> > Have
> > >added an example to the KIP. The downside of keeping clients under
> > users in
> > >ZK in 2) is that the change notification for sub-quota has
> entity_type
> > >"users". I could extend the JSON to include client separately, but
> > since
> > >changes to a client sub-quota does impact other clients of the user
> > as well
> > >due to change in remainder quota, it may be ok as it is. Do let me
> > know if
> > >it looks confusing in the example.
> > >4. Agree, updated.
> > >
> > >
> > > On Wed, Jun 15, 2016 at 10:27 PM, Jun Rao  wrote:
> > >
> > >> Hi, Rajini,
> > >>
> > >> Thanks for the updated 

Re: [VOTE] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-17 Thread Rajini Sivaram
+1 (non-binding)

On Fri, Jun 17, 2016 at 4:45 AM, Grant Henke  wrote:

> +1
>
> On Thu, Jun 16, 2016 at 8:50 PM, tao xiao  wrote:
>
> > +1
> >
> > On Fri, 17 Jun 2016 at 09:03 Harsha  wrote:
> >
> > > +1 (binding)
> > > Thanks,
> > > Harsha
> > >
> > > On Thu, Jun 16, 2016, at 05:46 PM, Henry Cai wrote:
> > > > +1
> > > >
> > > > On Thu, Jun 16, 2016 at 3:46 PM, Ismael Juma 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Fri, Jun 17, 2016 at 12:44 AM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1.
> > > > > >
> > > > > > On Thu, Jun 16, 2016 at 11:44 AM, Jason Gustafson <
> > > ja...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I'd like to open the vote for KIP-62. This proposal attempts to
> > > address
> > > > > > one
> > > > > > > of the recurring usability problems that users of the new
> > consumer
> > > have
> > > > > > > faced with as little impact as possible. You can read the full
> > > details
> > > > > > > here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > > > > .
> > > > > > >
> > > > > > > After some discussion on this list, I think we were in
> agreement
> > > that
> > > > > > this
> > > > > > > change addresses a major part of the problem and we've left the
> > > door
> > > > > open
> > > > > > > for further improvements, such as adding a heartbeat() API or a
> > > > > > separately
> > > > > > > configured rebalance timeout. Thanks in advance to everyone who
> > > helped
> > > > > > > review the proposal.
> > > > > > >
> > > > > > > -Jason
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > >
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>



-- 
Regards,

Rajini


Build failed in Jenkins: kafka-trunk-jdk8 #705

2016-06-17 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-3838; Update zkClient to 0.9 and Zookeeper to 3.4.8

--
[...truncated 13570 lines...]
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfoTest > 
testEncodeDecode STARTED

org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfoTest > 
testEncodeDecode PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testStickiness STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testStickiness PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby PASSED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval STARTED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval PASSED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUnite STARTED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUnite PASSED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUniteMany 
STARTED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUniteMany 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterNonPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterNonPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testLockStateDirectory STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testLockStateDirectory PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testGetStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testGetStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testClose STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testClose PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testChangeLogOffsets STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testChangeLogOffsets PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testNoTopic STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testNoTopic PASSED

org.apache.kafka.streams.processor.internals.RecordQueueTest > testTimeTracking 
STARTED

org.apache.kafka.streams.processor.internals.RecordQueueTest > testTimeTracking 
PASSED

org.apache.kafka.streams.processor.internals.StreamTaskTest > testProcessOrder 
STARTED

org.apache.kafka.streams.processor.internals.StreamTaskTest > testProcessOrder 
PASSED

org.apache.kafka.streams.processor.internals.StreamTaskTest > testPauseResume 
STARTED

org.apache.kafka.streams.processor.internals.StreamTaskTest > testPauseResume 
PASSED

org.apache.kafka.streams.processor.internals.StreamTaskTest > 
testMaybePunctuate STARTED

org.apache.kafka.streams.processor.internals.StreamTaskTest > 
testMaybePunctuate PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testInjectClients STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testInjectClients PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeCommit 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeCommit 
PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testPartitionAssignmentChange STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testPartitionAssignmentChange PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeClean 
STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > testMaybeClean 
PASSED

org.apache.kafka.streams.processor.internals.MinTimestampTrackerTest > 
testTracking STARTED

org.apache.kafka.streams.processor.internals.MinTimestampTrackerTest > 
testTracking PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
testAssignWithStandbyReplicas STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
testAssignWithStandbyReplicas PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
testAssignWithNewTasks 

[GitHub] kafka pull request #1517: [Kafka Streams] Clean-up script [WIP]

2016-06-17 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/1517

[Kafka Streams] Clean-up script [WIP]

Anyof: @guozhangwang @enothereska @dguy @miguno 

Two variants:
 1. python client + bash script (seems not to be the right way, due to 
external dependencies)
 2. java only (bash script to call the java program not included yet)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka clean-up

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1517


commit 3cfe0c67217f936102aabcf2ccc4dc20b563dd5b
Author: Matthias J. Sax 
Date:   2016-06-11T12:12:42Z

Initial code drop for clean-up script

commit 6d5570e1590f4247c1678e307c0dc17f4d453a1b
Author: Matthias J. Sax 
Date:   2016-06-15T21:04:21Z

added Java CleanUp-Client




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3838) Bump zkclient and Zookeeper versions

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335584#comment-15335584
 ] 

ASF GitHub Bot commented on KAFKA-3838:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1512


> Bump zkclient and Zookeeper versions
> 
>
> Key: KAFKA-3838
> URL: https://issues.apache.org/jira/browse/KAFKA-3838
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: FIlipe Azevedo
>
> Zookeeper 3.4.8 has some improvements, specifically it handles DNS 
> Re-resolution when a connection to zookeeper fails. This potentially allows 
> Round Robin DNS without the need to hardcode the IP Addresses in the config. 
> http://zookeeper.apache.org/doc/r3.4.8/releasenotes.html
> ZkClient has a new 0.9 release which uses zookeeper 3.4.8 which is already 
> marked as stable.
> Tests are passing.
> Here is the PR: https://github.com/apache/kafka/pull/1504



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1512: KAFKA-3838 zkClient and Zookeeper version bump

2016-06-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1512


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3576) Unify KStream and KTable API

2016-06-17 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-3576.
---
Resolution: Resolved

Resolved by https://issues.apache.org/jira/browse/KAFKA-3561

> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Damian Guy
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3860) No broker partitions consumed by consumer thread

2016-06-17 Thread Kundan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kundan updated KAFKA-3860:
--
Description: 
I'm using kakfa-clients-0.8.2.1 to consume messages as Java KafkaConsumer.

Problem: I have two consumer one consumer is able receive message message but 
another consumer is unable to receive message and one of the WARNING was 
visible in log saying that "No broker partitions consumed by consumer thread" 
and also it was observed that offset was (-1)[selected partitions : mytopic:0: 
fetched offset = -1: consumed offset = -1] while fetching offset at consumer 
end.

Another log was "entering consume " which is kept 
ZookeeperConsumerConnector.scala file at line 220.

Other information

1) Environment: CentOS
2) No of zookeper: 5,
3) Properties used to connect zookeeper: 
 a)zookeeper.connect : zk1:2181,zk2:2181,zk2:2181,zk4:2181,zk5:2181,
 b)group.id: mygroupId
 c) fetch.message.max.bytes: 5242880 (Producer side also set)
 d) auto.commit.enable: false
4) Single thread Highlevel consumer code used to consume data.
5) Consumer is running in separate VM  Kafka/zookeeper in separate VM
 

  was:
I'm using kakfa-clients-0.8.2.1 to consume messages as Java KafkaConsumer.

Problem: I have two consumer one consumer is able receive message message but 
another consumer is unable to receive message and one of the WARNING was 
visible in log saying that "No broker partitions consumed by consumer thread" 
and also it was observed that offset was (-1) while fetching offset at consumer 
end.

Another log was "entering consume " which is kept 
ZookeeperConsumerConnector.scala file at line 220.

Other information

1) Environment: CentOS
2) No of zookeper: 5,
3) Properties used to connect zookeeper: 
 a)zookeeper.connect : zk1:2181,zk2:2181,zk2:2181,zk4:2181,zk5:2181,
 b)group.id: mygroupId
 c) fetch.message.max.bytes: 5242880 (Producer side also set)
 d) auto.commit.enable: false
4) Single thread Highlevel consumer code used to consume data.
5) Consumer is running in separate VM  Kafka/zookeeper in separate VM
 


> No broker partitions consumed by consumer thread
> 
>
> Key: KAFKA-3860
> URL: https://issues.apache.org/jira/browse/KAFKA-3860
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
> Environment: centOS 
>Reporter: Kundan
>Assignee: Neha Narkhede
>
> I'm using kakfa-clients-0.8.2.1 to consume messages as Java KafkaConsumer.
> Problem: I have two consumer one consumer is able receive message message but 
> another consumer is unable to receive message and one of the WARNING was 
> visible in log saying that "No broker partitions consumed by consumer thread" 
> and also it was observed that offset was (-1)[selected partitions : 
> mytopic:0: fetched offset = -1: consumed offset = -1] while fetching offset 
> at consumer end.
> Another log was "entering consume " which is kept 
> ZookeeperConsumerConnector.scala file at line 220.
> Other information
> 1) Environment: CentOS
> 2) No of zookeper: 5,
> 3) Properties used to connect zookeeper: 
>  a)zookeeper.connect : zk1:2181,zk2:2181,zk2:2181,zk4:2181,zk5:2181,
>  b)group.id: mygroupId
>  c) fetch.message.max.bytes: 5242880 (Producer side also set)
>  d) auto.commit.enable: false
> 4) Single thread Highlevel consumer code used to consume data.
> 5) Consumer is running in separate VM  Kafka/zookeeper in separate VM
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3860) No broker partitions consumed by consumer thread

2016-06-17 Thread Kundan (JIRA)
Kundan created KAFKA-3860:
-

 Summary: No broker partitions consumed by consumer thread
 Key: KAFKA-3860
 URL: https://issues.apache.org/jira/browse/KAFKA-3860
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
 Environment: centOS 
Reporter: Kundan
Assignee: Neha Narkhede


I'm using kakfa-clients-0.8.2.1 to consume messages as Java KafkaConsumer.

Problem: I have two consumer one consumer is able receive message message but 
another consumer is unable to receive message and one of the WARNING was 
visible in log saying that "No broker partitions consumed by consumer thread" 
and also it was observed that offset was (-1) while fetching offset at consumer 
end.

Another log was "entering consume " which is kept 
ZookeeperConsumerConnector.scala file at line 220.

Other information

1) Environment: CentOS
2) No of zookeper: 5,
3) Properties used to connect zookeeper: 
 a)zookeeper.connect : zk1:2181,zk2:2181,zk2:2181,zk4:2181,zk5:2181,
 b)group.id: mygroupId
 c) fetch.message.max.bytes: 5242880 (Producer side also set)
 d) auto.commit.enable: false
4) Single thread Highlevel consumer code used to consume data.
5) Consumer is running in separate VM  Kafka/zookeeper in separate VM
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)