Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review

2015-05-05 Thread Ismael Juma
Hi Jun,

On Sat, May 2, 2015 at 2:50 PM, Jun Rao j...@confluent.io wrote:

 We will also need to figure out if we need CONTRIBUTING.md like the
 following to take care of the Apache licensing stuff.

 https://github.com/apache/spark/blob/master/CONTRIBUTING.md


Yes indeed. That is in step 3 of the missing pieces in the original email.

As for merging changes, I think squashing the commits will be ideal during
 merge.


The script does squash commits during merge indeed. It also includes
retains a bunch of useful information in the commit message. I think one
loses some of the benefits of git when using a one squashed commit per PR
approach, but that is a separate issue and I am not suggesting a change in
this regard at this point.


 Also, it would be great if we can always put the latest changes on
 top during the merge.


Just to make sure I understand correctly, do you mean updating the working
copy with the latest from trunk before merging? Or did I misunderstand?

Best,
Ismael


Re: Kafka KIP hangout May 5

2015-05-05 Thread Tom Graves
Is there information on how to get into the hangout?  Or is it by invitation?  
I'm very interested in the authorization changes.
Thanks,Tom 


 On Monday, May 4, 2015 9:20 AM, Jun Rao j...@confluent.io wrote:
   

 Hi, Everyone,

We will have a KIP hangout at 11 PST on May 5. The following is the agenda.
If you want to attend and is not on the invite, please let me know.

Agenda:
KIP-4 (admin commands): any remaining issues
KIP-11 (authorization): any remaining issues
KIP-21 (configuration management)

Thanks,

Jun


  

Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-05-05 Thread Sriharsha Chintalapani
Thanks for the comments everyone.
Hi Jay,
     I do have a question regarding configurable interface on how to pass a 
MapString, ? properties. I couldn’t find any other classes using it. JMX 
reporter overrides it but doesn’t implement it.  So with configurable 
partitioner how can a user pass in partitioner configuration since its getting 
instantiated within the producer.

Thanks,
Harsha


On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Harsha,  

That proposal sounds good. One minor thing--I don't think we need to have  
the partitioner.metadata property. Our reason for using string properties  
is exactly to make config extensible at runtime. So a given partitioner can  
add whatever properties make sense using the configure() api it defines.  

-Jay  

On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote:  

 Thanks Jay  Gianmarco for the comments. I picked the option A, if user  
 sends a partition id than it will applied and partitioner.class method  
 will only called if partition id is null .  
 Please take a look at the updated KIP here  
  
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
   
 . Let me know if you see anything missing.  
  
 Thanks,  
 Harsha  
  
 On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote:  
  Hi,  
   
   
  Here are the questions I think we should consider:  
   1. Do we need this at all given that we have the partition argument in  
   ProducerRecord which gives full control? I think we do need it because  
 this  
   is a way to plug in a different partitioning strategy at run time and  
 do it  
   in a fairly transparent way.  

   
  Yes, we need it if we want to support different partitioning strategies  
  inside Kafka rather than requiring the user to code them externally.  
   
   
   3. Do we need to add the value? I suspect people will have uses for  
   computing something off a few fields in the value to choose the  
 partition.  
   This would be useful in cases where the key was being used for log  
   compaction purposes and did not contain the full information for  
 computing  
   the partition.  

   
  I am not entirely sure about this. I guess that most partitioners should  
  not use it.  
  I think it makes it easier to reason about the system if the partitioner  
  only works on the key.  
  Hoever, if the value (and its serialization) are already available, there  
  is not much harm in passing them along.  
   
   
   4. This interface doesn't include either an init() or close() method.  
 It  
   should implement Closable and Configurable, right?  

   
  Right now the only application I can think of to have an init() and  
  close()  
  is to read some state information (e.g., load information) that is  
  published on some external distributed storage (e.g., zookeeper) by the  
  brokers.  
  It might be useful also for reconfiguration and state migration.  
   
  I think it's not a very common use case right now, but if the added  
  complexity is not too much it might be worth to have support for these  
  methods.  
   
   
   
   5. What happens if the user both sets the partition id in the  
   ProducerRecord and sets a partitioner? Does the partition id just get  
   passed in to the partitioner (as sort of implied in this interface?).  
 This  
   is a bit weird since if you pass in the partition id you kind of  
 expect it  
   to get used, right? Or is it the case that if you specify a partition  
 the  
   partitioner isn't used at all (in which case no point in including  
   partition in the Partitioner api).  


  The user should be able to override the partitioner on a per-record basis  
  by explicitly setting the partition id.  
  I don't think it makes sense for the partitioners to take hints on the  
  partition.  
   
  I would even go the extra step, and have a default logic that accepts  
  both  
  key and partition id (current interface) and calls partition() only if  
  the  
  partition id is not set. The partition() method does *not* take the  
  partition ID as input (only key-value).  
   
   
  Cheers,  
  --  
  Gianmarco  
   
   
   
   Cheers,  

   -Jay  

   On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani   
 ka...@harsha.io  
   wrote:  

Hi,  
Here is the KIP for adding a partitioner interface for  
 producer.  
 
 

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
   
There is one open question about how interface should look like.  
 Please  
take a look and let me know if you prefer one way or the other.  
 
Thanks,  
Harsha  
 
 

  


Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review

2015-05-05 Thread Ismael Juma
Hi Ewen,

Comments inline.

On Fri, May 1, 2015 at 8:38 AM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Also +1. There are some drawbacks to using Github for reviews, e.g. lots of
 emails for each review because they don't let you publish your entire
 review in one go like RB does, but it drastically lowers the barrier to
 contributing for most developers. Also, if you haven't tried it, hub
 https://hub.github.com/ makes it really easy to checkout and test PRs.


Good points.

One thing I noticed is that when you try to generate a PR it defaults to
 the 0.8.2 branch. Can we fix that up to be trunk by default? That's the
 most common use case; version branches are really only useful when a
 release is being prepared. Do changes to the Github repo require tickets to
 the Apache Infra team or is this something committers have control over?


It can be changed to trunk, but it does require a ticket to the Infra
team[1]. Sadly, PRs can only be closed via commits, PR creator or a ticket
to Infra. This is quite clunky for abandoned PRs (a label can be used to
exclude such PRs from view, if needed).

On a related note, which perhaps should be discussed on another thread:

 The CI setup is a related issue that we might want to rethink.


I did think about Travis as well, but I thought that should be considered
as a subsequent and separate discussion indeed.

Best,
Ismael

[1] An example of such a request:
https://issues.apache.org/jira/browse/INFRA-9208


Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-05 Thread Mayuresh Gharat
Just a quick question, can we handle REQUEST TIMEOUT as disconnections and
do a fresh MetaDataRequest and retry instead of failing the request?


Thanks,

Mayuresh


On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new timeout, it
 would be useful to think through whether/how that applies to messages in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   o.a.k.common.network.Selector.disconnected set to see if we need to
 do
   something for a disconnected 

Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-05 Thread Jun Rao
Hi, Jiangjie,

Thanks for taking on this.

I was thinking that one way to decouple the dependency on Metadata in
NetworkClient is the following.
1. Make Metadata an interface.
2. Rename current Metadata class to sth like KafkaMetadata that implements
the Metadata interface.
3. Have a new NoOpMetadata class that implements the Metadata interface.
This class
3.1 does nothing for any write method
3.2 returns max long for any method that asks for a timestamp
3.3. returns an empty Cluster for fetch().

Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata
when using NetworkClient in the controller. The consumer/producer client
will be using KafkaMetadata.

As for replica fetchers, it may be possible to use KafkaConsumer. However,
we don't need the metadata and the offset management. So, perhaps it's
easier to just use NetworkClient. Also, currently, there is one replica
fetcher thread per source broker. By using NetworkClient, we can change
that to using a single thread for all source brokers. This is probably a
bigger change. So, maybe we can do it later.

Jun


I think we probably need to replace replica fetcher with NetworkClient as
well. Replica fetcher gets leader from the controller and therefore doesn't

On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I am trying to see if we can reuse the NetworkClient class to be used in
 controller to broker communication. (Also, we can probably use
 KafkaConsumer which is already using NetworkClient in replica fetchers).
 Currently NetworkClient does the following things in addition to sending
 requests.

   1.  Connection state management.
   2.  Flow control (inflight requests)
   3.  Metadata refresh

 In controller we need (1) and (2) but not (3). NetworkClient is tightly
 coupled with metadata now and this is the major blocker of reusing
 NetworkClient in controller.  For controller, we don’t need NetworkClient
 to manage any metadata because the controller has listeners to monitor the
 cluster state and has all the information about topic metadata.
 I am thinking we can add a disable metadata refresh flag to NetworkClient
 or set metadata refresh interval to be Long.MAX_VALUE, so the metadata will
 be managed outside NetworkClient.
 This needs minimal change to allow NetworkClient to be reused, but the
 ugly part is NetworkClient still has the entire Metadata while it actually
 only needs a NodeList.

 Want to see what do people think about this.

 Thanks.

 Jiangjie (Becket) Qin




Re: Review Request 33049: Patch for KAFKA-2084

2015-05-05 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33049/
---

(Updated May 6, 2015, 12:52 a.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2084
https://issues.apache.org/jira/browse/KAFKA-2084


Repository: kafka


Description (updated)
---

This is currently not being used anywhere in the code because I haven't yet 
figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a 
better idea once I look at the new purgatory implementation. Hopefully, this 
smaller patch is easier to review.

Added more testcases


Some locking changes for reading/creating the sensors


WIP patch


Sample usage in ReplicaManager


Updated patch for quotas. This patch does the following: 1. Add per-client 
metrics for both producer and consumers 2. Add configuration for quotas 3. 
Compute delay times in the metrics package and return the delay times in 
QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to 
throttle any type of request. Implemented request throttling for produce and 
fetch requests. 5. Added unit and integration test cases. I've not yet added 
integration testcases testing the consumer delays.. will update the patch once 
those are ready


Incorporated Jun's comments


Adding javadoc


KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics


Adding more configs


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java 
dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 
  clients/src/main/java/org/apache/kafka/common/metrics/Quota.java 
d82bb0c055e631425bc1ebbc7d387baac76aeeaa 
  
clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
 a451e5385c9eca76b38b425e8ac856b2715fcffe 
  clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
ca823fd4639523018311b814fde69b6177e73b97 
  clients/src/test/java/org/apache/kafka/common/utils/MockTime.java  
  core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
417960dd1ab407ebebad8fdb0e97415db3e91a2f 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
9efa15ca5567b295ab412ee9eea7c03eb4cdc18b 
  core/src/main/scala/kafka/server/KafkaServer.scala 
b7d2a2842e17411a823b93bdedc84657cbd62be1 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ShutdownableThread.scala 
fc226c863095b7761290292cd8755cd7ad0f155c 
  core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
8014a5a6c362785539f24eb03d77278434614fe6 
  core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33049/diff/


Testing
---


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)

2015-05-05 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529672#comment-14529672
 ] 

Aditya A Auradkar commented on KAFKA-2084:
--

Updated reviewboard https://reviews.apache.org/r/33049/diff/
 against branch origin/trunk

 byte rate metrics per client ID (producer and consumer)
 ---

 Key: KAFKA-2084
 URL: https://issues.apache.org/jira/browse/KAFKA-2084
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, 
 KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, 
 KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, 
 KAFKA-2084_2015-05-05_17:52:02.patch


 We need to be able to track the bytes-in/bytes-out rate on a per-client ID 
 basis. This is necessary for quotas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)

2015-05-05 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2084:
-
Attachment: KAFKA-2084_2015-05-05_17:52:02.patch

 byte rate metrics per client ID (producer and consumer)
 ---

 Key: KAFKA-2084
 URL: https://issues.apache.org/jira/browse/KAFKA-2084
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, 
 KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, 
 KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, 
 KAFKA-2084_2015-05-05_17:52:02.patch


 We need to be able to track the bytes-in/bytes-out rate on a per-client ID 
 basis. This is necessary for quotas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-05-05 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529694#comment-14529694
 ] 

Honghai Chen commented on KAFKA-1646:
-

When trying add test case for Log, got test failures for existing test cases in 
windows.

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
 KAFKA-1646_20150422.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1646) Improve consumer read performance for Windows

2015-05-05 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529694#comment-14529694
 ] 

Honghai Chen edited comment on KAFKA-1646 at 5/6/15 1:13 AM:
-

When trying add test case for Log, got test failures for existing test cases in 
windows.
https://issues.apache.org/jira/browse/KAFKA-2170


was (Author: waldenchen):
When trying add test case for Log, got test failures for existing test cases in 
windows.

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
 KAFKA-1646_20150422.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka KIP hangout May 5

2015-05-05 Thread Ashish Singh
Below are my notes from today’s KIP hangout. Please feel free to add/
correct the content.
Kafka KIP Discussion (May 5) KIP-4 (admin commands): any remaining issues

   - Only issue left is how to handle multiple instructions for one topic.
   Suggestion here was to silently ignore duplicate instructions, use map with
   topic as key and execute the last one.
   - Andrii will update the KIP

KIP-11 (authorization): any remaining issues

   - Parth added the diagrams asked by Joe
   - Does curent KIP prevents us from adding SAML? SAML can be added as
   another port for authentication. Getting roles and groups for SAML will
   require SAML specific authentication implementation. Session object can be
   used to capture required information.
   - Is current interface, authorizer, bound with ACLs? Based on
   discussion, it does not look like. If some implementation decides not to
   use ACLs at all, it can ignore all other methods and implement just the
   authorize method. ACLs is in the interface as acls cli is a requirement for
   KAFKA community.
   - Authorizer interface does not have to change for different
   implementations, session implementation, using session builder, should take
   care of what goes into session.
   - Harsha is right now picking up authorized id, a string, based on
   client’s principal name in SASL implementation and get peer principal in
   ssl session for authentication, can add subject as part of the session.
   However one can use session builder to add whatever information to session
   and use it for authentication. Just get principal in ssl session is not
   enough for Linkedin. Harsh will add comment to the code to make it a clear.
   - Joel had some concerns on deny taking precedence in authorization.
   - If no authorizer is not configured, security code path won’t be
   exercised at all. If authorizer is configured, by default topics without
   acls will be disallowed for every user except for superusers. Admin will
   have to set a config parameter to change the default behaviour and allow
   anyone to be able to access topics that do not have any acls on them.

KIP-21 (configuration management)

   - To have the dynamic configuration mechanism to pick changes, do we
   need multiple backends, like configs in ZK, config file, configs in topic,
   etc? This might lead to multiple sources of truth, usually not a good thing
   to have.
   - If multiple backends are required, do they need to be active at the
   same time.
   - People in general were concerned that existing tools like puppet, etc.
   might not work if we move to zk based backend for configs.
   - One suggestion was to have users/admins update the current config file
   and support a reloadConfig command that would make broker to re-read the
   configs file.
   - Plan is to have more discussions around this on the discuss mail chain.

​

On Mon, May 4, 2015 at 7:19 AM, Jun Rao j...@confluent.io wrote:

 Hi, Everyone,

 We will have a KIP hangout at 11 PST on May 5. The following is the agenda.
 If you want to attend and is not on the invite, please let me know.

 Agenda:
 KIP-4 (admin commands): any remaining issues
 KIP-11 (authorization): any remaining issues
 KIP-21 (configuration management)

 Thanks,

 Jun




-- 

Regards,
Ashish


Re: [DISCUSS] KIP-21 Configuration Management

2015-05-05 Thread Gwen Shapira
Sorry I missed the call today :)

I think an additional requirement would be:
Make sure that traditional deployment tools (Puppet, Chef, etc) are still
capable of managing Kafka configuration.

For this reason, I'd like the configuration refresh to be pretty close to
what most Linux services are doing to force a reload of configuration.
AFAIK, this involves handling HUP signal in the main thread to reload
configuration. Then packaging scripts can add something nice like service
kafka reload.

(See Apache web server:
https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)

Gwen


On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Good discussion. Since we will be talking about this at 11am, I wanted
 to organize these comments into requirements to see if we are all on
 the same page.

 REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
 be general enough to work for all configs that we envision may need to
 accept changes at runtime. e.g., log (topic), broker, client (quotas),
 etc.. possible options include:
 - ZooKeeper watcher
 - Kafka topic
 - Direct RPC to controller (or config coordinator)

 The current KIP is really focused on REQUIREMENT 1 and I think that is
 reasonable as long as we don't come up with something that requires
 significant re-engineering to support the other requirements.

 REQUIREMENT 2: Provide consistency of configs across brokers (modulo
 per-broker overrides) or at least be able to verify consistency.  What
 this effectively means is that config changes must be seen by all
 brokers eventually and we should be able to easily compare the full
 config of each broker.

 REQUIREMENT 3: Central config store. Needs to work with plain
 file-based configs and other systems (e.g., puppet). Ideally, should
 not bring in other dependencies (e.g., a DB). Possible options:
 - ZooKeeper
 - Kafka topic
 - other? E.g. making it pluggable?

 Any other requirements?

 Thanks,

 Joel

 On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote:
  Hey Neha,
 
  Thanks for the feedback.
  1. In my earlier exchange with Jay, I mentioned the broker writing all
 it's configs to ZK (while respecting the overrides). Then ZK can be used to
 view all configs.
 
  2. Need to think about this a bit more. Perhaps we can discuss this
 during the hangout tomorrow?
 
  3  4) I viewed these config changes as mainly administrative
 operations. In the case, it may be reasonable to assume that the ZK port is
 available for communication from the machine these commands are run. Having
 a ConfigChangeRequest (or similar) is nice to have but having a new API and
 sending requests to controller also change how we do topic based
 configuration currently. I was hoping to keep this KIP as minimal as
 possible and provide a means to represent and modify client and broker
 based configs in a central place. Are there any concerns if we tackle these
 things in a later KIP?
 
  Thanks,
  Aditya
  
  From: Neha Narkhede [n...@confluent.io]
  Sent: Sunday, May 03, 2015 9:48 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP-21 Configuration Management
 
  Thanks for starting this discussion, Aditya. Few questions/comments
 
  1. If you change the default values like it's mentioned in the KIP, do
 you
  also overwrite the local config file as part of updating the default
 value?
  If not, where does the admin look to find the default values, ZK or local
  Kafka config file? What if a config value is different in both places?
 
  2. I share Gwen's concern around making sure that popular config
 management
  tools continue to work with this change. Would love to see how each of
  those would work with the proposal in the KIP. I don't know enough about
  each of the tools but seems like in some of the tools, you have to define
  some sort of class with parameter names as config names. How will such
  tools find out about the config values? In Puppet, if this means that
 each
  Puppet agent has to read it from ZK, this means the ZK port has to be
 open
  to pretty much every machine in the DC. This is a bummer and a very
  confusing requirement. Not sure if this is really a problem or not (each
 of
  those tools might behave differently), though pointing out that this is
  something worth paying attention to.
 
  3. The wrapper tools that let users read/change config tools should not
  depend on ZK for the reason mentioned above. It's a pain to assume that
 the
  ZK port is open from any machine that needs to run this tool. Ideally
 what
  users want is a REST API to the brokers to change or read the config (ala
  Elasticsearch), but in the absence of the REST API, we should think if we
  can write the tool such that it just requires talking to the Kafka broker
  port. This will require a config RPC.
 
  4. Not sure if KIP is the right place to discuss the design of
 propagating
  the config changes to the brokers, but have you 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-05-05 Thread Andrii Biletskyi
Guys,

I've updated the wiki to reflect all previously discussed items
(regarding the schema only - this is included to phase 1).
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

I think we can have the final discussion today (for phase 1) and
in case no new remarks I will start the voting thread.

With regards to AlterTopicRequest semantics. I agree with Jun,
and I think it's my bad I focused on multiple topics in one request.
The same situation is possible in ProduceRequest, Fetch, TopicMetadata
and we handle it naturally and in the most transparent way - we
put all separate instructions into a map and thus silently ignore
duplicates.
This also makes Response part simple too - it's just a map Topic-ErrorCode.
I think we need to follow the same approach for Alter (and Create, Delete)
request. With this we add nothing new in terms of batch requests
semantics.

Thanks,
Andrii Biletskyi

On Thu, Apr 30, 2015 at 4:31 PM, Jun Rao j...@confluent.io wrote:

 The following is a description of some of my concerns on allowing the same
 topic multiple times in AlterTopicRequest.

 ATP has an array of entries, each corresponding to a topic. We allow
 multiple changes to a topic in a single entry. Those changes may fail to
 apply independently (e.g., the config change may succeed, but the replica
 assignment change may fail). If there is an issue applying one of the
 changes, we will set an error code for that entry in the response.
 If we allow the same topic to be specified multiple times in ATR, it can
 happen that the first entry succeeds, but the second entry fails partially.
 Now, from the admin's perspective, it's a bit hard to do the verification.
 Ideally, you want to wait for the changes in the first entry to be applied.
 However, the second entry may have part of the changes applied
 successfully.

 About putting restrictions on the requests. Currently, we effectively
 expect a topic-partition to be only specified once in the FetchRequest.
 Allowing the same topic-partition to be specified multiple times in
 FetchRequest will be confusing and complicates the implementation (e.g.,
 putting the request in purgatory). A few other requests probably have
 similar implicit assumptions on topic or topic-partition being unique in
 each request.

 Thanks,

 Jun


 On Tue, Apr 28, 2015 at 5:26 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Guys,
 
  A quick summary of our today's meeting.
 
  There were no additional issues/questions. The only item about which
  we are not 100% sure is multiple instructions for one topic in one
  request case.
  It was proposed by Jun to explain reasons behind not allowing users doing
  that again
  here in mailing list, and in case we implement it in final version
 document
  it
  well so API clients understand what exactly is not allowed and why.
 
  At the meantime I will update the KIP. After that I will start voting
  thread.
 
  Thanks,
  Andrii Biletskyi
 
  On Tue, Apr 28, 2015 at 10:33 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Guys,
  
   It seems that there are no open questions left so prior to our weekly
  call
   let me summarize what I'm going to implement as part of phase one for
   KIP-4.
  
   1. Add 3 new Wire Protocol requests - Create-, Alter- and
   DeleteTopicRequest
  
   2. Topic requests are batch requests, errors are returned per topic as
  part
   of batch response.
  
   3. Topic requests are asynchronous - respective commands are only
   started and server is not blocked until command is finished.
  
   4. It will be not allowed to specify multiple mutations for the same
  topic
   in scope of one batch request - a special error will be returned for
 such
   topic.
  
   5. There will be no dedicated request for reassign-partitions - it is
   simulated
   with AlterTopicRequest.ReplicaAssignment field.
  
   6. Preferred-replica-leader-election is not supported since there is no
   need to have
   a public API to trigger such operation.
  
   7. TopicMetadataReqeust will be evolved to version 1 - topic-level
   configuration
   per topic will be included and ISR field will be removed. Automatic
   topic-creation
   logic will be removed (we will use CreateTopicRequest for that).
  
   Thanks,
   Andrii Biletskyi
  
  
   On Tue, Apr 28, 2015 at 12:23 AM, Jun Rao j...@confluent.io wrote:
  
   Yes, to verify if a partition reassignment completes or not, we just
  need
   to make sure AR == RAR. So, we don't need ISR for this. It's probably
   still
   useful to know ISR for monitoring in general though.
  
   Thanks,
  
   Jun
  
   On Mon, Apr 27, 2015 at 4:15 AM, Andrii Biletskyi 
   andrii.bilets...@stealth.ly wrote:
  
Okay, I had some doubts in terms of reassign-partitions case. I was
not sure whether we need ISR to check post condition of partition
reassignment. But I think we can rely on assigned replicas - the
   workflow
in 

[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)

2015-05-05 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529422#comment-14529422
 ] 

Aditya A Auradkar commented on KAFKA-2084:
--

Updated reviewboard https://reviews.apache.org/r/33049/diff/
 against branch origin/trunk

 byte rate metrics per client ID (producer and consumer)
 ---

 Key: KAFKA-2084
 URL: https://issues.apache.org/jira/browse/KAFKA-2084
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, 
 KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, 
 KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch


 We need to be able to track the bytes-in/bytes-out rate on a per-client ID 
 basis. This is necessary for quotas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33049: Patch for KAFKA-2084

2015-05-05 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33049/
---

(Updated May 5, 2015, 10:27 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2084
https://issues.apache.org/jira/browse/KAFKA-2084


Repository: kafka


Description (updated)
---

This is currently not being used anywhere in the code because I haven't yet 
figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a 
better idea once I look at the new purgatory implementation. Hopefully, this 
smaller patch is easier to review.

Added more testcases


Some locking changes for reading/creating the sensors


WIP patch


Sample usage in ReplicaManager


Updated patch for quotas. This patch does the following: 1. Add per-client 
metrics for both producer and consumers 2. Add configuration for quotas 3. 
Compute delay times in the metrics package and return the delay times in 
QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to 
throttle any type of request. Implemented request throttling for produce and 
fetch requests. 5. Added unit and integration test cases. I've not yet added 
integration testcases testing the consumer delays.. will update the patch once 
those are ready


Incorporated Jun's comments


Adding javadoc


KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java 
dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 
  clients/src/main/java/org/apache/kafka/common/metrics/Quota.java 
d82bb0c055e631425bc1ebbc7d387baac76aeeaa 
  
clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
 a451e5385c9eca76b38b425e8ac856b2715fcffe 
  clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
ca823fd4639523018311b814fde69b6177e73b97 
  clients/src/test/java/org/apache/kafka/common/utils/MockTime.java  
  core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
417960dd1ab407ebebad8fdb0e97415db3e91a2f 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
9efa15ca5567b295ab412ee9eea7c03eb4cdc18b 
  core/src/main/scala/kafka/server/KafkaServer.scala 
b7d2a2842e17411a823b93bdedc84657cbd62be1 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ShutdownableThread.scala 
fc226c863095b7761290292cd8755cd7ad0f155c 
  core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
8014a5a6c362785539f24eb03d77278434614fe6 
  core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33049/diff/


Testing
---


Thanks,

Aditya Auradkar



Re: Review Request 33049: Patch for KAFKA-2084

2015-05-05 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33049/
---

(Updated May 5, 2015, 10:29 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2084
https://issues.apache.org/jira/browse/KAFKA-2084


Repository: kafka


Description (updated)
---

Updated patch for quotas. This patch does the following:
1. Add per-client metrics for both producer and consumers 
2. Add configuration for quotas 
3. Compute delay times in the metrics package and return the delay times in 
QuotaViolationException
4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of 
request. Implemented request throttling for produce and fetch requests. 
5. Added unit and integration test cases. I've not yet added integration 
testcases testing the consumer delays.. will update the patch once those are 
ready

Incorporated Jun's comments
KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics


Diffs
-

  clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java 
dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 
  clients/src/main/java/org/apache/kafka/common/metrics/Quota.java 
d82bb0c055e631425bc1ebbc7d387baac76aeeaa 
  
clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
 a451e5385c9eca76b38b425e8ac856b2715fcffe 
  clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
ca823fd4639523018311b814fde69b6177e73b97 
  clients/src/test/java/org/apache/kafka/common/utils/MockTime.java  
  core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
417960dd1ab407ebebad8fdb0e97415db3e91a2f 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
9efa15ca5567b295ab412ee9eea7c03eb4cdc18b 
  core/src/main/scala/kafka/server/KafkaServer.scala 
b7d2a2842e17411a823b93bdedc84657cbd62be1 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ShutdownableThread.scala 
fc226c863095b7761290292cd8755cd7ad0f155c 
  core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
8014a5a6c362785539f24eb03d77278434614fe6 
  core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33049/diff/


Testing
---


Thanks,

Aditya Auradkar



[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)

2015-05-05 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2084:
-
Attachment: KAFKA-2084_2015-05-05_15:27:35.patch

 byte rate metrics per client ID (producer and consumer)
 ---

 Key: KAFKA-2084
 URL: https://issues.apache.org/jira/browse/KAFKA-2084
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, 
 KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, 
 KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch


 We need to be able to track the bytes-in/bytes-out rate on a per-client ID 
 basis. This is necessary for quotas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2172) Round-robin partition assignment strategy too restrictive

2015-05-05 Thread Jason Rosenberg (JIRA)
Jason Rosenberg created KAFKA-2172:
--

 Summary: Round-robin partition assignment strategy too restrictive
 Key: KAFKA-2172
 URL: https://issues.apache.org/jira/browse/KAFKA-2172
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg


The round-ropin partition assignment strategy, was introduced for the 
high-level consumer, starting with 0.8.2.1.  This appears to be a very 
attractive feature, but it has an unfortunate restriction, which prevents it 
from being easily utilized.  That is that it requires all consumers in the 
consumer group have identical topic regex selectors, and that they have the 
same number of consumer threads.

It turns out this is not always the case for our deployments.  It's not unusual 
to run multiple consumers within a single process (with different topic 
selectors), or we might have multiple processes dedicated for different topic 
subsets.  Agreed, we could change these to have separate group ids for each sub 
topic selector (but unfortunately, that's easier said than done).  In several 
cases, we do at least have separate client.ids set for each sub-consumer, so it 
would be incrementally better if we could at least loosen the requirement such 
that each set of topics selected by a groupid/clientid pair are the same.

But, if we want to do a rolling restart for a new version of a consumer config, 
the cluster will likely be in a state where it's not possible to have a single 
config until the full rolling restart completes across all nodes.  This results 
in a consumer outage while the rolling restart is happening.

Finally, it's especially problematic if we want to canary a new version for a 
period before rolling to the whole cluster.

I'm not sure why this restriction should exist (as it obviously does not exist 
for the 'range' assignment strategy).  It seems it could be made to work 
reasonably well with heterogenous topic selection and heterogenous thread 
counts.  The documentation states that The round-robin partition assignor lays 
out all the available partitions and all the available consumer threads. It 
then proceeds to do a round-robin assignment from partition to consumer thread.

If the assignor can lay out all the available partitions and all the available 
consumer threads, it should be able to uniformly assign partitions to the 
available threads.  In each case, if a thread belongs to a consumer that 
doesn't have that partition selected, just move to the next available thread 
that does have the selection, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2172) Round-robin partition assignment strategy too restrictive

2015-05-05 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529436#comment-14529436
 ] 

Jason Rosenberg commented on KAFKA-2172:


as a side note, the documentation here: 
http://kafka.apache.org/documentation.html#consumerconfigs currently has 
duplicate entries for 'partition.assignment.strategy', one of which is far more 
in depth than the other!

 Round-robin partition assignment strategy too restrictive
 -

 Key: KAFKA-2172
 URL: https://issues.apache.org/jira/browse/KAFKA-2172
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg

 The round-ropin partition assignment strategy, was introduced for the 
 high-level consumer, starting with 0.8.2.1.  This appears to be a very 
 attractive feature, but it has an unfortunate restriction, which prevents it 
 from being easily utilized.  That is that it requires all consumers in the 
 consumer group have identical topic regex selectors, and that they have the 
 same number of consumer threads.
 It turns out this is not always the case for our deployments.  It's not 
 unusual to run multiple consumers within a single process (with different 
 topic selectors), or we might have multiple processes dedicated for different 
 topic subsets.  Agreed, we could change these to have separate group ids for 
 each sub topic selector (but unfortunately, that's easier said than done).  
 In several cases, we do at least have separate client.ids set for each 
 sub-consumer, so it would be incrementally better if we could at least loosen 
 the requirement such that each set of topics selected by a groupid/clientid 
 pair are the same.
 But, if we want to do a rolling restart for a new version of a consumer 
 config, the cluster will likely be in a state where it's not possible to have 
 a single config until the full rolling restart completes across all nodes.  
 This results in a consumer outage while the rolling restart is happening.
 Finally, it's especially problematic if we want to canary a new version for a 
 period before rolling to the whole cluster.
 I'm not sure why this restriction should exist (as it obviously does not 
 exist for the 'range' assignment strategy).  It seems it could be made to 
 work reasonably well with heterogenous topic selection and heterogenous 
 thread counts.  The documentation states that The round-robin partition 
 assignor lays out all the available partitions and all the available consumer 
 threads. It then proceeds to do a round-robin assignment from partition to 
 consumer thread.
 If the assignor can lay out all the available partitions and all the 
 available consumer threads, it should be able to uniformly assign partitions 
 to the available threads.  In each case, if a thread belongs to a consumer 
 that doesn't have that partition selected, just move to the next available 
 thread that does have the selection, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[VOTE] KIP-4 Admin Commands / Phase-1

2015-05-05 Thread Andrii Biletskyi
Hi all,

This is a voting thread for KIP-4 Phase-1. It will include Wire protocol
changes
and server side handling code.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

Thanks,
Andrii Biletskyi


Re: Review Request 33088: add heartbeat to coordinator

2015-05-05 Thread Onur Karaman


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  281-355
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line281
 
  If we add the CoordinatorMetadata class, these functions can all be 
  moved to that class.

I made a CoordinatorMetadata that only manages groups and keeps track of which 
topics to listen to changes for. It delegates as much group and consumer logic 
as possible to the ConsumerCoordinator (stuff like group synchronization, 
adding/removing/updating consumers and the corresponding topic bind/unbind 
operations), as I wanted to keep all the group logic in one place 
(ConsumerCoordinator).


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 463
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line463
 
  Let's call finishCurrentHeartbeat upon receiving the join group 
  request (line 155 above), and only expectNextHeartbeat here. Because 
  otherwise if the rebalance is waiting long then this consumer will be 
  unnecessarily marked as dead right?

Just to make sure I understand your concern, is your concern that:
1. we scheduleHeartbeatExpiration for a consumer c in group g
2. g for some reason transitions to PreparingRebalance
3. c rejoins and is waiting for the rebalance to complete.
4. PreparingRebalance is taking a long time. The DelayedHeartbeat expires. c is 
marked dead.
5. g rebalances and stabilizes without c?

If this is your concern, it won't happen. Expired DelayedHeartbeats won't mark 
rejoined consumers that are awaiting rebalance as dead due to the following 
concept from the 
[wiki](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Heartbeatsduringrebalance):
 The co-ordinator pauses failure detection for a consumer that has sent a 
JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the 
hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead 
if it does not receive a HeartbeatRequest from that time for another 
session.timeout.ms milliseconds.

I implemented this pause by making sure an expired DelayedHeartbeat on c won't 
mark c as dead if c is awaitingRebalance (line 353).


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 499
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line499
 
  I think once we split the scheduleHeartbeat Expiration function, we do 
  not need to check here as after the join-group is received but before 
  rebalance is finished, we will not expect any heartbeat anymore.

It's true that failure detection will pause during rebalance. But as stated 
above, some sort of check is still needed in onExpirationHeartbeat to prevent 
DelayedHeartbeats that were scheduled before rebalance and expired during 
rebalance from marking rejoined consumers as dead regardless of how that 
scheduleHeartbeatExpiration method is split up.


On April 28, 2015, 12:13 a.m., Onur Karaman wrote:
  Some general comments:
  
  1. Could you add some step-by-step and if-else comments in coordinator's 
  functions to make it more self-explanary?
  
  2. Note that onCompleteXXX functions will be called from purgatory repear 
  threads, which will in turn call other internal functions like rebalance(), 
  while other times these functions will be called by the request handler 
  threads. Are there any possible deadlock issues? I ask for another 
  thought-through just because we have been bite-on before due to this fact.

For 2: I see two types of resources here: group locks and the metadataLock. We 
either only acquire a group lock, only acquire a metadataLock, or nest the 
metadataLock inside a group lock. We never nest group locks, so we don't get 
circular waits between group locks. We also never nest a group lock inside the 
metadataLock, so we don't get circular waits between group locks and the 
metadataLock.

ZkClient doesn't cause any circular waits because internally, it just puts all 
ZkEvents into a queue and has a dedicated thread processing each event's 
corresponding TopicPartitionChangeListener handleDataChange or 
handleDataDeleted callbacks one by one, and these callbacks adhere to the 
locking patterns above. Since there doesn't seem to be any circular waits, it 
looks like there are no deadlocks.


- Onur


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review81743
---


On May 5, 2015, 5:50 p.m., Onur Karaman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33088/
 

[jira] [Updated] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2160:
---
Reviewer: Jun Rao

 DelayedOperationPurgatory should remove the pair in watchersForKey with empty 
 watchers list
 ---

 Key: KAFKA-2160
 URL: https://issues.apache.org/jira/browse/KAFKA-2160
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch


 With purgatory usage in consumer coordinator, it will be common that watcher 
 lists are very short and live only for a short time. So we'd better clean 
 them from the watchersForKey Pool once the list become empty in 
 checkAndComplete() calls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1387) Kafka getting stuck creating ephemeral node it has already created when two zookeeper sessions are established in a very short period of time

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1387:
---
Reviewer: Jun Rao

 Kafka getting stuck creating ephemeral node it has already created when two 
 zookeeper sessions are established in a very short period of time
 -

 Key: KAFKA-1387
 URL: https://issues.apache.org/jira/browse/KAFKA-1387
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Fedor Korotkiy
Priority: Blocker
  Labels: newbie, patch, zkclient-problems
 Attachments: kafka-1387.patch


 Kafka broker re-registers itself in zookeeper every time handleNewSession() 
 callback is invoked.
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
  
 Now imagine the following sequence of events.
 1) Zookeeper session reestablishes. handleNewSession() callback is queued by 
 the zkClient, but not invoked yet.
 2) Zookeeper session reestablishes again, queueing callback second time.
 3) First callback is invoked, creating /broker/[id] ephemeral path.
 4) Second callback is invoked and it tries to create /broker/[id] path using 
 createEphemeralPathExpectConflictHandleZKBug() function. But the path is 
 already exists, so createEphemeralPathExpectConflictHandleZKBug() is getting 
 stuck in the infinite loop.
 Seems like controller election code have the same issue.
 I'am able to reproduce this issue on the 0.8.1 branch from github using the 
 following configs.
 # zookeeper
 tickTime=10
 dataDir=/tmp/zk/
 clientPort=2101
 maxClientCnxns=0
 # kafka
 broker.id=1
 log.dir=/tmp/kafka
 zookeeper.connect=localhost:2101
 zookeeper.connection.timeout.ms=100
 zookeeper.sessiontimeout.ms=100
 Just start kafka and zookeeper and then pause zookeeper several times using 
 Ctrl-Z.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2165:
---
Reviewer: Jun Rao

 ReplicaFetcherThread: data loss on unknown exception
 

 Key: KAFKA-2165
 URL: https://issues.apache.org/jira/browse/KAFKA-2165
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2165.patch


 Sometimes in our cluster some replica gets out of the isr. Then broker 
 redownloads the partition from the beginning. We got the following messages 
 in logs:
 {code}
 # The leader:
 [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when 
 processing fetch request for partition [topic,11] offset 54369274 from 
 follower with correlation id 2634499. Possible cause: Request for offset 
 54369274 but we only have log segments in the range 49322124 to 54369273. 
 (kafka.server.ReplicaManager)
 {code}
 {code}
 # The follower:
 [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for 
 partition [topic,11] reset its fetch offset from 49322124 to current leader 
 21's start offset 49322124 (kafka.server.ReplicaFetcherThread)
 [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 
 54369274 for partition [topic,11] out of range; reset offset to 49322124 
 (kafka.server.ReplicaFetcherThread)
 {code}
 This occures because we update fetchOffset 
 [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124]
  and then try to process message. 
 If any exception except OffsetOutOfRangeCode occures we get unsynchronized 
 fetchOffset and replica.logEndOffset.
 On next fetch iteration we can get 
 fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1690:
---
Reviewer: Jun Rao

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1928:
---
Reviewer: Jay Kreps

 Move kafka.network over to using the network classes in 
 org.apache.kafka.common.network
 ---

 Key: KAFKA-1928
 URL: https://issues.apache.org/jira/browse/KAFKA-1928
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, 
 KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch


 As part of the common package we introduced a bunch of network related code 
 and abstractions.
 We should look into replacing a lot of what is in kafka.network with this 
 code. Duplicate classes include things like Receive, Send, etc. It is likely 
 possible to also refactor the SocketServer to make use of Selector which 
 should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2161) Fix a few copyrights

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2161:
---
Reviewer: Joe Stein

 Fix a few copyrights
 

 Key: KAFKA-2161
 URL: https://issues.apache.org/jira/browse/KAFKA-2161
 Project: Kafka
  Issue Type: Bug
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Trivial
 Attachments: KAFKA-2161.patch


 I noticed that I accidentally let some incorrect copyright headers slip in 
 with the KAKFA-1501 patch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33760: Patch for KAFKA-2121

2015-05-05 Thread Jun Rao


 On May 4, 2015, 5:31 p.m., Neha Narkhede wrote:
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java,
   line 25
  https://reviews.apache.org/r/33760/diff/2/?file=947511#file947511line25
 
  This and also the Deserializer should extend Configurable too right?

We need to pass in the info of whether the serializer/deserializer is for key 
or value during configuration time. That's why we choose not to use the general 
Configurable interface.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/#review82399
---


On May 1, 2015, 10:42 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33760/
 ---
 
 (Updated May 1, 2015, 10:42 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 override java.io.Closeable$close method in Serializer and Deserializer 
 interfaces without throwing checked IOException. this is to avoid breaking 
 the source compatability.
 
 
 add a test for checking Serializer is closed during KafkaProducer#close
 
 
 missing copyright header in previous checkin
 
 
 remvoed throws Exception for test methods
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
 9a57579f87cb19cb6affe6d157ff8446c23e3551 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c44054038066f0d0829d05f082b2ee42b34cded7 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  eea2c28450736d1668c68828f77a49470a82c3d0 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  49f1427bcbe43c773920a25aa69a71d0329296b7 
   clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
 6f948f240c906029a0f972bf770f288f390ea714 
   clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33760/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




[jira] [Updated] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2129:
---
Reviewer: Guozhang Wang

 Consumer could make multiple concurrent metadata requests
 -

 Key: KAFKA-2129
 URL: https://issues.apache.org/jira/browse/KAFKA-2129
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2129.patch


 The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
 This protects against multiple metadata requests being made and is read on 
 poll() on the NetworkClient. It is written to when a request is initiated.
 This is fine for the producer. Which seems to have one thread writing. The 
 KafkaConsumer's poll()  method is synchronized, so there will not be more 
 than one writer entering from there. However, the NetworkClient's poll() 
 method is also accessed on the Consumer's partitionsFor() method. Which could 
 be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-21 Configuration Management

2015-05-05 Thread Joel Koshy
Good discussion. Since we will be talking about this at 11am, I wanted
to organize these comments into requirements to see if we are all on
the same page.

REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
be general enough to work for all configs that we envision may need to
accept changes at runtime. e.g., log (topic), broker, client (quotas),
etc.. possible options include:
- ZooKeeper watcher
- Kafka topic
- Direct RPC to controller (or config coordinator)

The current KIP is really focused on REQUIREMENT 1 and I think that is
reasonable as long as we don't come up with something that requires
significant re-engineering to support the other requirements.

REQUIREMENT 2: Provide consistency of configs across brokers (modulo
per-broker overrides) or at least be able to verify consistency.  What
this effectively means is that config changes must be seen by all
brokers eventually and we should be able to easily compare the full
config of each broker.

REQUIREMENT 3: Central config store. Needs to work with plain
file-based configs and other systems (e.g., puppet). Ideally, should
not bring in other dependencies (e.g., a DB). Possible options:
- ZooKeeper
- Kafka topic
- other? E.g. making it pluggable?

Any other requirements?

Thanks,

Joel

On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote:
 Hey Neha,
 
 Thanks for the feedback.
 1. In my earlier exchange with Jay, I mentioned the broker writing all it's 
 configs to ZK (while respecting the overrides). Then ZK can be used to view 
 all configs.
 
 2. Need to think about this a bit more. Perhaps we can discuss this during 
 the hangout tomorrow?
 
 3  4) I viewed these config changes as mainly administrative operations. In 
 the case, it may be reasonable to assume that the ZK port is available for 
 communication from the machine these commands are run. Having a 
 ConfigChangeRequest (or similar) is nice to have but having a new API and 
 sending requests to controller also change how we do topic based 
 configuration currently. I was hoping to keep this KIP as minimal as possible 
 and provide a means to represent and modify client and broker based configs 
 in a central place. Are there any concerns if we tackle these things in a 
 later KIP?
 
 Thanks,
 Aditya
 
 From: Neha Narkhede [n...@confluent.io]
 Sent: Sunday, May 03, 2015 9:48 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP-21 Configuration Management
 
 Thanks for starting this discussion, Aditya. Few questions/comments
 
 1. If you change the default values like it's mentioned in the KIP, do you
 also overwrite the local config file as part of updating the default value?
 If not, where does the admin look to find the default values, ZK or local
 Kafka config file? What if a config value is different in both places?
 
 2. I share Gwen's concern around making sure that popular config management
 tools continue to work with this change. Would love to see how each of
 those would work with the proposal in the KIP. I don't know enough about
 each of the tools but seems like in some of the tools, you have to define
 some sort of class with parameter names as config names. How will such
 tools find out about the config values? In Puppet, if this means that each
 Puppet agent has to read it from ZK, this means the ZK port has to be open
 to pretty much every machine in the DC. This is a bummer and a very
 confusing requirement. Not sure if this is really a problem or not (each of
 those tools might behave differently), though pointing out that this is
 something worth paying attention to.
 
 3. The wrapper tools that let users read/change config tools should not
 depend on ZK for the reason mentioned above. It's a pain to assume that the
 ZK port is open from any machine that needs to run this tool. Ideally what
 users want is a REST API to the brokers to change or read the config (ala
 Elasticsearch), but in the absence of the REST API, we should think if we
 can write the tool such that it just requires talking to the Kafka broker
 port. This will require a config RPC.
 
 4. Not sure if KIP is the right place to discuss the design of propagating
 the config changes to the brokers, but have you thought about just letting
 the controller oversee the config changes and propagate via RPC to the
 brokers? That way, there is an easier way to express config changes that
 require all brokers to change it for it to be called complete. Maybe this
 is not required, but it is hard to say if we don't discuss the full set of
 configs that need to be dynamic.
 
 Thanks,
 Neha
 
 On Fri, May 1, 2015 at 12:53 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Aditya,
 
  This is a great! A couple of comments:
 
  1. Leaving the file config in place is definitely the least disturbance.
  But let's really think about getting rid of the files and just have one
  config mechanism. There is always a tendency to make everything pluggable
  

[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2123:
---
Reviewer: Jay Kreps

 Make new consumer offset commit API use callback + future
 -

 Key: KAFKA-2123
 URL: https://issues.apache.org/jira/browse/KAFKA-2123
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, 
 KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, 
 KAFKA-2123_2015-05-04_22:51:48.patch


 The current version of the offset commit API in the new consumer is
 void commit(offsets, commit type)
 where the commit type is either sync or async. This means you need to use 
 sync if you ever want confirmation that the commit succeeded. Some 
 applications will want to use asynchronous offset commit, but be able to tell 
 when the commit completes.
 This is basically the same problem that had to be fixed going from old 
 consumer - new consumer and I'd suggest the same fix using a callback + 
 future combination. The new API would be
 FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback 
 callback);
 where ConsumerCommitCallback contains a single method:
 public void onCompletion(Exception exception);
 We can provide shorthand variants of commit() for eliding the different 
 arguments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-05-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2121:
---
Reviewer: Guozhang Wang

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, 
 KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, 
 KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, 
 KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, 
 KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, 
 KAFKA-2121_2015-05-01_15:42:30.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., 

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Ashish Singh
Adrian,

Trying to follow up the discussion here. Is my understanding correct that
if we have topic hierarchies then we can do without namespaces. To me
namespace is an abstraction, it can be implemented with topic hierarchies
as well, would you agree? If so I guess topic hierarchies is the way to go.

One more thing I would like to put forward as an advantage of having topic
hierarchy is that we can support acls/ permissions inheritance in topic
hierarchies. This will avoid bootstrapping acls for each new topic.

On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com wrote:

 Thanks for your response.

 I agree.  I think it would be useful to get consensus on how
 namespaces and topic-hierarchies relate to one another.  To seed the
 discussion - here's my viewpoint, which I hope others will challenge:

 I see namespaces as something the broker provides to ensure that
 two tenants can never interact with one another - even if,
 for example, they both choose to use a topic called 'TOPIC1'.  I
 imagine that this would be achieved by having the broker silently
 add a per tenant prefix to the topic name in each request, and
 strip it off in the response. So, for example, if 'TENANT1' sends
 a message to 'TOPIC1', this would be re-written so that the send is
 to 'TENANT1-TOPIC1'.

 If topic-hierarchies were available, then I think the prefix,
 added/removed to implement namespaces, would be the first level of
 qualification in the hierarchy.  So, for example, if 'TENANT1' sends
 a message to 'TOPIC1', this would be re-written so that the send is to
 'TENANT1/TOPIC1'.

 Extrapolating from what's currently in KIP-21 (dynamic configuration)
 I guess that topic-hierarchies might result in the possibility for
 even finer grain topic configuration - e.g. a ZNode structure of:
 '/config/topics/topic_level1/topic_level2/...'.  This would work
 best with an implementation of namespaces that was based on-top of
 topic-hierarchies, allowing configuration to be applied at the scope
 of: all tenants, one tenant, or one tenant's topics.

 So in summary: I think that namespaces can be usefully implemented
 independently of topic-hierarchies, and when topic-hierarchies are
 implemented would be easily integrated.

 Regards
 - Adrian

 -Gwen Shapira gshap...@cloudera.com wrote: -
 To: dev@kafka.apache.org dev@kafka.apache.org
 From: Gwen Shapira gshap...@cloudera.com
 Date: 04/28/2015 06:54PM
 Subject: Re: Adding multi-tenancy capabilities to Kafka

 I think recent discussion showed some need for topic namespaces - for
 example, Jun's use case for reserving topic names for specific users
 discussed under authorization.

 I think some discussion should happen on namespaces vs more full-fledged
 topic-hierarchy.
 I like the simplicity of namespaces, but there may be other requirements
 (such as inheriting configuration).

 Gwen

 On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com
 wrote:

  Hi all,
 
  I've been looking at how a Kafka cluster could be used be deployed so
 that
  it can be used by multiple tenants.  Specifically: a scheme where the
  clients belonging to each tenant receive the impression they are
 operating
  against their own cluster.  The ongoing security and quota work looks
 like
  it might provide a lot of the isolation requirements, but each tenant
 would
  currently share the same namespace for topics and consumer groups.  So
 the
  illusion of it is my own cluster is broken as soon as two tenants try
  independent to use the same topic name.
 
  I wondered if other people on this list are interested in being able to
  support multiple tenants in this way?  And / or if the ability to avoid
  clashes in the topic namespace would be useful?  I am considering
  submitting a KIP in this area - but first wanted to get a feeling for
  whether these kinds of capabilities are of interest to others.
 
  Thanks in advance,
  - Adrian
 
  Unless stated otherwise above:
  IBM United Kingdom Limited - Registered in England and Wales with number
  741598.
  Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
 3AU
 
 


 Unless stated otherwise above:
 IBM United Kingdom Limited - Registered in England and Wales with number
 741598.
 Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU




-- 

Regards,
Ashish


Re: Review Request 33088: add heartbeat to coordinator

2015-05-05 Thread Onur Karaman


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java, lines 
  73-76
  https://reviews.apache.org/r/33088/diff/9/?file=941786#file941786line73
 
  Maybe we can merge them into one error code, 
  UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY?

I have mixed feelings on this. The growing number of error codes bothers me, 
but it also seems like the distinct error codes will be helpful in consumer 
logs.

This issue comes up again with roundrobin where a group is doing roundrobin and 
a new consumer joining the group subscribes to a different set of topics from 
the others in the group. It's not really 
INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY, at least not with the current 
naming and exception message.


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 60-66
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line60
 
  Is removing the comments intentional?

Yeah it was intentional. The comments were almost exactly repeating the same 
information as the variable types.


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 200
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line200
 
  Some coding style suggestion:
  
  if {
// comments
...
  } else {
// comments
...
  }

Will do.

Somewhat of a tangent: It looks like we started using a checkstyle on 
kafka-clients. It would be nice if kafka came with eclipse and intellij style 
schemes for java and scala to reduce the style related comments and have the 
IDEs more proactively do the fixes for us.


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  374-382
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line374
 
  It seems the transition from Rebalancing to Stable is trivial now. In 
  this case shall we just remove the Rebalancing state, or it is actually 
  meaningful to keep it?

I prefer having each state representing one action for that group. If we reduce 
it down to something like Dead, Stable, and Rebalancing as I think you're 
suggesting, then the Rebalancing state would mean that we're either waiting for 
consumers to rejoin OR we're recomputing the partition assignments, but we 
don't know which. With Dead, Stable, PreparingRebalance, and Rebalancing, we 
reduce the amount of guessing for what the group is doing within a given state.


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  400-401
  https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line400
 
  I think we need to add a broker config for maximum heartbeat failure 
  here; right now it is just 1.

Paraphrased offline discussion I had with Guozhang:

The only use case I can imagine for the maximum failures concept is where we 
already have consumers out in the wild across many different services and we've 
found that the default session timeout is too quick, so we end up getting a lot 
of group membership churn. It may be hard to get all the services to change 
their consumer configs, so we'd rolling bounce brokers with a higher 
maxAllowedExpiredHeartbeats.

Pros of having the maximum failures concept:
- It's a sort of hotfix means of giving the brokers some coarse-grained control 
over reducing group membership churn.

Cons of having the maximum failures concept:
- It's a sort of hotfix means of giving the brokers some coarse-grained control 
over reducing group membership churn.
- Consumers don't know the broker's maxAllowedExpiredHeartbeats, so it seems 
odd from the consumer's perspective that they'd specify the session timeout for 
session expiration, but it may end up actually being some multiple of that 
value (and this entirely depends on which broker they happen to communicate 
with).
- If we accept session timouts in the range [low, high], then really the 
consumer failures would happen in a time range of [low * 
maxAllowedExpiredHeartbeats, high * maxAllowedExpiredHeartbeats]. This gap can 
end up uncomfortably large.

Guozhang and I agreed to remove this concept for now. If we see later on that 
we need to make the coordinator more conservative in marking consumers as dead, 
we can add similar logic in later.

P.S.: If we want to eventually use this maximum failures concept, there's a bug 
in onConsumerHeartbeatExpired. If we have NOT exceeded the 
maxAllowedExpiredHeartbeats, then we should schedule another DelayedHeartbeat 
and not update consumer.latestHeartbeat.


 On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  451-453
  

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Ashish Singh
Jay,

I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes
up, but we can definitely start putting together the ideas. Would you
suggest starting a kip or just a generic shared document that can be later
translated into a KIP?

On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote:

 Hey guys,

 It would be nice to get a design around this. Though there are currently so
 many big things in flight I do wonder if we should start another parallel
 thing...? But working out a design can't hurt.

 Personally I think since one of the goals of Kafka is data integration we
 really want to support a central cluster with many users. So I think we
 really want to have full hierarchies with all users being part of the same
 tree of streams. This way granting permissions on a new stream just means
 changing permissions on what topic paths a client has access to rather
 than needing a new client instance that connects to that separate namespace
 (a la zookeeper). I think this will be a nice way to share config and acl
 defaults too.

 -Jay

 On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com
 javascript:; wrote:

  Adrian,
 
  Trying to follow up the discussion here. Is my understanding correct that
  if we have topic hierarchies then we can do without namespaces. To me
  namespace is an abstraction, it can be implemented with topic hierarchies
  as well, would you agree? If so I guess topic hierarchies is the way to
 go.
 
  One more thing I would like to put forward as an advantage of having
 topic
  hierarchy is that we can support acls/ permissions inheritance in topic
  hierarchies. This will avoid bootstrapping acls for each new topic.
 
  On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com
 javascript:;
  wrote:
 
   Thanks for your response.
  
   I agree.  I think it would be useful to get consensus on how
   namespaces and topic-hierarchies relate to one another.  To seed the
   discussion - here's my viewpoint, which I hope others will challenge:
  
   I see namespaces as something the broker provides to ensure that
   two tenants can never interact with one another - even if,
   for example, they both choose to use a topic called 'TOPIC1'.  I
   imagine that this would be achieved by having the broker silently
   add a per tenant prefix to the topic name in each request, and
   strip it off in the response. So, for example, if 'TENANT1' sends
   a message to 'TOPIC1', this would be re-written so that the send is
   to 'TENANT1-TOPIC1'.
  
   If topic-hierarchies were available, then I think the prefix,
   added/removed to implement namespaces, would be the first level of
   qualification in the hierarchy.  So, for example, if 'TENANT1' sends
   a message to 'TOPIC1', this would be re-written so that the send is to
   'TENANT1/TOPIC1'.
  
   Extrapolating from what's currently in KIP-21 (dynamic configuration)
   I guess that topic-hierarchies might result in the possibility for
   even finer grain topic configuration - e.g. a ZNode structure of:
   '/config/topics/topic_level1/topic_level2/...'.  This would work
   best with an implementation of namespaces that was based on-top of
   topic-hierarchies, allowing configuration to be applied at the scope
   of: all tenants, one tenant, or one tenant's topics.
  
   So in summary: I think that namespaces can be usefully implemented
   independently of topic-hierarchies, and when topic-hierarchies are
   implemented would be easily integrated.
  
   Regards
   - Adrian
  
   -Gwen Shapira gshap...@cloudera.com javascript:; wrote: -
   To: dev@kafka.apache.org javascript:; dev@kafka.apache.org
 javascript:;
   From: Gwen Shapira gshap...@cloudera.com javascript:;
   Date: 04/28/2015 06:54PM
   Subject: Re: Adding multi-tenancy capabilities to Kafka
  
   I think recent discussion showed some need for topic namespaces - for
   example, Jun's use case for reserving topic names for specific users
   discussed under authorization.
  
   I think some discussion should happen on namespaces vs more
 full-fledged
   topic-hierarchy.
   I like the simplicity of namespaces, but there may be other
 requirements
   (such as inheriting configuration).
  
   Gwen
  
   On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com
 javascript:;
   wrote:
  
Hi all,
   
I've been looking at how a Kafka cluster could be used be deployed so
   that
it can be used by multiple tenants.  Specifically: a scheme where the
clients belonging to each tenant receive the impression they are
   operating
against their own cluster.  The ongoing security and quota work looks
   like
it might provide a lot of the isolation requirements, but each tenant
   would
currently share the same namespace for topics and consumer groups.
 So
   the
illusion of it is my own cluster is broken as soon as two tenants
 try
independent to use the same topic name.
   
I wondered if other people on 

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Gari Singh
I agree.  If we can really do pluggable authorization (and even pluggable
authentication), it would not actually be hard to effectively implement a
multi-tenant solution.  I am hoping to attempt to implement something like
this once there is code / patches for KIP-11 and KIP-12.  So I wonder if we
would actually need a KIP for this versus requirements for an authorization
module (and possibly an authentication module) and then document a best
practice for setting up a multi-tenant deployment.

I am a bit worried that KIPs 11/12 might not give us everything we need,
but I think we can work through that as the implementation of both
progresses

On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote:

 Jay,

 I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes
 up, but we can definitely start putting together the ideas. Would you
 suggest starting a kip or just a generic shared document that can be later
 translated into a KIP?

 On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  It would be nice to get a design around this. Though there are currently
 so
  many big things in flight I do wonder if we should start another parallel
  thing...? But working out a design can't hurt.
 
  Personally I think since one of the goals of Kafka is data integration we
  really want to support a central cluster with many users. So I think we
  really want to have full hierarchies with all users being part of the
 same
  tree of streams. This way granting permissions on a new stream just
 means
  changing permissions on what topic paths a client has access to rather
  than needing a new client instance that connects to that separate
 namespace
  (a la zookeeper). I think this will be a nice way to share config and acl
  defaults too.
 
  -Jay
 
  On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com
  javascript:; wrote:
 
   Adrian,
  
   Trying to follow up the discussion here. Is my understanding correct
 that
   if we have topic hierarchies then we can do without namespaces. To me
   namespace is an abstraction, it can be implemented with topic
 hierarchies
   as well, would you agree? If so I guess topic hierarchies is the way to
  go.
  
   One more thing I would like to put forward as an advantage of having
  topic
   hierarchy is that we can support acls/ permissions inheritance in topic
   hierarchies. This will avoid bootstrapping acls for each new topic.
  
   On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com
  javascript:;
   wrote:
  
Thanks for your response.
   
I agree.  I think it would be useful to get consensus on how
namespaces and topic-hierarchies relate to one another.  To seed the
discussion - here's my viewpoint, which I hope others will challenge:
   
I see namespaces as something the broker provides to ensure that
two tenants can never interact with one another - even if,
for example, they both choose to use a topic called 'TOPIC1'.  I
imagine that this would be achieved by having the broker silently
add a per tenant prefix to the topic name in each request, and
strip it off in the response. So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is
to 'TENANT1-TOPIC1'.
   
If topic-hierarchies were available, then I think the prefix,
added/removed to implement namespaces, would be the first level of
qualification in the hierarchy.  So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is
 to
'TENANT1/TOPIC1'.
   
Extrapolating from what's currently in KIP-21 (dynamic configuration)
I guess that topic-hierarchies might result in the possibility for
even finer grain topic configuration - e.g. a ZNode structure of:
'/config/topics/topic_level1/topic_level2/...'.  This would work
best with an implementation of namespaces that was based on-top of
topic-hierarchies, allowing configuration to be applied at the scope
of: all tenants, one tenant, or one tenant's topics.
   
So in summary: I think that namespaces can be usefully implemented
independently of topic-hierarchies, and when topic-hierarchies are
implemented would be easily integrated.
   
Regards
- Adrian
   
-Gwen Shapira gshap...@cloudera.com javascript:; wrote:
 -
To: dev@kafka.apache.org javascript:; dev@kafka.apache.org
  javascript:;
From: Gwen Shapira gshap...@cloudera.com javascript:;
Date: 04/28/2015 06:54PM
Subject: Re: Adding multi-tenancy capabilities to Kafka
   
I think recent discussion showed some need for topic namespaces - for
example, Jun's use case for reserving topic names for specific users
discussed under authorization.
   
I think some discussion should happen on namespaces vs more
  full-fledged
topic-hierarchy.
I like the simplicity of namespaces, but there may be other

RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-05 Thread Honghai Chen
Use config.segmentSize should be ok.   Previously add that one for make sure 
the file not exceed config.segmentSize, actually the function maybeRoll already 
make sure that.
When try add test case for recover, blocked by the rename related issue, just 
open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any 
recommendation for fix that issue? 

Thanks, Honghai Chen

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: Tuesday, May 5, 2015 12:51 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

This seems similar to what's in
https://issues.apache.org/jira/browse/KAFKA-1065.

Also, could you explain why the preallocated size is set to config.segmentSize
- 2 * config.maxMessageSize, instead of just config.segmentSize?

Thanks,

Jun

On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

   Hi guys,
 I'm trying add test cases, but below case crashed at line 
 segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea for it?
 Appreciate your help.
 The case assume kafka suddenly crash, and need recover the 
 last segment.

 kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
 java.io.IOException: The requested operation cannot be performed 
 on a file w ith a user-mapped section open
 at java.io.RandomAccessFile.setLength(Native Method)
 at
 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
 at
 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
 ndex.scala:272)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
 ala:272)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
 ala:272)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
 at kafka.log.LogSegment.recover(LogSegment.scala:199)
 at
 kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
 st.scala:306)

   def recover(maxMessageSize: Int): Int = {
 index.truncate()
 index.resize(index.maxIndexSize)
 var validBytes = 0
 var lastIndexEntry = 0
 val iter = log.iterator(maxMessageSize)
 try {
   while(iter.hasNext) {
 val entry = iter.next
 entry.message.ensureValid()
 if(validBytes - lastIndexEntry  indexIntervalBytes) {
   // we need to decompress the message, if required, to get 
 the offset of the first uncompressed message
   val startOffset =
 entry.message.compressionCodec match {
   case NoCompressionCodec =
 entry.offset
   case _ =

 ByteBufferMessageSet.deepIterator(entry.message).next().offset
   }
   index.append(startOffset, validBytes)
   lastIndexEntry = validBytes
 }
 validBytes += MessageSet.entrySize(entry.message)
   }
 } catch {
   case e: InvalidMessageException =
 logger.warn(Found invalid messages in log segment %s at byte 
 offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage))
 }
 val truncated = log.sizeInBytes - validBytes
 log.truncateTo(validBytes)
 index.trimToValidSize()
 truncated
   }

 /* create a segment with   pre allocate and Crash*/
   @Test
   def testCreateWithInitFileSizeCrash() {
 val tempDir = TestUtils.tempDir()
 val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, 
 false, 512*1024*1024, true)

 val ms = messages(50, hello, there)
 seg.append(50, ms)
 val ms2 = messages(60, alpha, beta)
 seg.append(60, ms2)
 val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
 assertEquals(ms2.toList, read.messageSet.toList)
 val oldSize = seg.log.sizeInBytes()
 val oldPosition = seg.log.channel.position
 val oldFileSize = seg.log.file.length
 assertEquals(512*1024*1024, oldFileSize)
 seg.flush()
 seg.log.channel.close()
 seg.index.close()

 val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, 
 SystemTime,
 true)
 segReopen.recover(64*1024)
 val size = segReopen.log.sizeInBytes()
 val position = segReopen.log.channel.position
 val fileSize = segReopen.log.file.length
 assertEquals(oldPosition, position)
 assertEquals(oldSize, size)
 assertEquals(size, fileSize)
   }



 Thanks, Honghai Chen 

 -Original Message-
 From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID]
 Sent: Friday, April 24, 2015 12:57 AM
 To: dev@kafka.apache.org
 Cc: Roshan Naik
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve 
 

[jira] [Created] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows

2015-05-05 Thread Honghai Chen (JIRA)
Honghai Chen created KAFKA-2170:
---

 Summary: 10 LogTest cases failed for  file.renameTo failed under 
windows
 Key: KAFKA-2170
 URL: https://issues.apache.org/jira/browse/KAFKA-2170
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.9.0
 Environment: Windows
Reporter: Honghai Chen
Assignee: Jay Kreps


get latest code from trunk, then run test 
gradlew  -i core:test --tests kafka.log.LogTest
Got 10 cases failed for same reason:

kafka.common.KafkaStorageException: Failed to change the log file suffix from  
to .deleted for log segment 0
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Log.deleteOldSegments(Log.scala:514)
at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at $Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
at 
org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at 

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Adrian Preston

I'm don't agree.  At least not completely ;)

Pluggable authorization (or hierarchies with the ability to define ACLs 
at each level) would allow you to isolate different groups of users. 
However each user will still be aware of one another's presence in the 
topic namespace - and could also be a noisy neighbor.  For example: 
say I configure Kafka with an authorization plugin that only allows 
access to topics that start with the user's security principal.  That 
partitions up the topic namespace so that one user can't clash with 
another - but has some limitations:


1) Applications need to be aware of the scheme.  They can't just use 
'topic1' - they need to know/find out their principal and prefix their 
topics appropriately.
2) There's no quotas (or similar scheme) that stop one user from 
impacting on another's available bandwidth.
3) Consumer groups would still be a shared namespace that users could 
clash on.


How to move this forward?  I agree with Gari that KIPs 11, 12 (and 13) 
are still in-flight - so it might be difficult to write another related 
KIP and keep it in step.  The shared document approach that Ashish 
suggested sounds like a good fit.  We can record our intent - and work 
out whether this needs to be written up as a KIP, or raised as 
improvement JIRAs, once the dust settles on 11, 12 and 13.


- Adrian.


On 05/05/15 20:33, Gari Singh wrote:

I agree.  If we can really do pluggable authorization (and even pluggable
authentication), it would not actually be hard to effectively implement a
multi-tenant solution.  I am hoping to attempt to implement something like
this once there is code / patches for KIP-11 and KIP-12.  So I wonder if we
would actually need a KIP for this versus requirements for an authorization
module (and possibly an authentication module) and then document a best
practice for setting up a multi-tenant deployment.

I am a bit worried that KIPs 11/12 might not give us everything we need,
but I think we can work through that as the implementation of both
progresses

On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote:


Jay,

I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes
up, but we can definitely start putting together the ideas. Would you
suggest starting a kip or just a generic shared document that can be later
translated into a KIP?

On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote:


Hey guys,

It would be nice to get a design around this. Though there are currently

so

many big things in flight I do wonder if we should start another parallel
thing...? But working out a design can't hurt.

Personally I think since one of the goals of Kafka is data integration we
really want to support a central cluster with many users. So I think we
really want to have full hierarchies with all users being part of the

same

tree of streams. This way granting permissions on a new stream just

means

changing permissions on what topic paths a client has access to rather
than needing a new client instance that connects to that separate

namespace

(a la zookeeper). I think this will be a nice way to share config and acl
defaults too.

-Jay

On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com
javascript:; wrote:


Adrian,

Trying to follow up the discussion here. Is my understanding correct

that

if we have topic hierarchies then we can do without namespaces. To me
namespace is an abstraction, it can be implemented with topic

hierarchies

as well, would you agree? If so I guess topic hierarchies is the way to

go.


One more thing I would like to put forward as an advantage of having

topic

hierarchy is that we can support acls/ permissions inheritance in topic
hierarchies. This will avoid bootstrapping acls for each new topic.

On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com

javascript:;

wrote:


Thanks for your response.

I agree.  I think it would be useful to get consensus on how
namespaces and topic-hierarchies relate to one another.  To seed the
discussion - here's my viewpoint, which I hope others will challenge:

I see namespaces as something the broker provides to ensure that
two tenants can never interact with one another - even if,
for example, they both choose to use a topic called 'TOPIC1'.  I
imagine that this would be achieved by having the broker silently
add a per tenant prefix to the topic name in each request, and
strip it off in the response. So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is
to 'TENANT1-TOPIC1'.

If topic-hierarchies were available, then I think the prefix,
added/removed to implement namespaces, would be the first level of
qualification in the hierarchy.  So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is

to

'TENANT1/TOPIC1'.

Extrapolating from what's currently in KIP-21 (dynamic configuration)
I guess that topic-hierarchies might 

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Adrian Preston

I'm don't agree.  At least not completely ;)

Pluggable authorization (or hierarchies with the ability to define ACLs 
at each level) would allow you to isolate different groups of users.  
However each user will still be aware of one another's presence in the 
topic namespace - and could also be a noisy neighbor.  For example: 
say I configure Kafka with an authorization plugin that only allows 
access to topics that start with the user's security principal.  That 
partitions up the topic namespace so that one user can't clash with 
another - but has some limitations:


1) Applications need to be aware of the scheme.  They can't just use 
'topic1' - they need to know/find out their principal and prefix their 
topics appropriately.
2) There's no quotas (or similar scheme) that stop one user from 
impacting on another's available bandwidth.
3) Consumer groups would still be a shared namespace that users could 
clash on.


How to move this forward?  I agree with Gari that KIPs 11, 12 (and 13) 
are still in-flight - so it might be difficult to write another related 
KIP and keep it in step.  The shared document approach that Ashish 
suggested sounds like a good fit.  We can record our intent - and work 
out whether this needs to be written up as a KIP, or raised as 
improvement JIRAs, once the dust settles on 11, 12 and 13.


- Adrian.

On 05/05/15 20:33, Gari Singh wrote:

I agree.  If we can really do pluggable authorization (and even pluggable
authentication), it would not actually be hard to effectively implement a
multi-tenant solution.  I am hoping to attempt to implement something like
this once there is code / patches for KIP-11 and KIP-12.  So I wonder if we
would actually need a KIP for this versus requirements for an authorization
module (and possibly an authentication module) and then document a best
practice for setting up a multi-tenant deployment.

I am a bit worried that KIPs 11/12 might not give us everything we need,
but I think we can work through that as the implementation of both
progresses

On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote:


Jay,

I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes
up, but we can definitely start putting together the ideas. Would you
suggest starting a kip or just a generic shared document that can be later
translated into a KIP?

On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote:


Hey guys,

It would be nice to get a design around this. Though there are currently

so

many big things in flight I do wonder if we should start another parallel
thing...? But working out a design can't hurt.

Personally I think since one of the goals of Kafka is data integration we
really want to support a central cluster with many users. So I think we
really want to have full hierarchies with all users being part of the

same

tree of streams. This way granting permissions on a new stream just

means

changing permissions on what topic paths a client has access to rather
than needing a new client instance that connects to that separate

namespace

(a la zookeeper). I think this will be a nice way to share config and acl
defaults too.

-Jay

On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com
javascript:; wrote:


Adrian,

Trying to follow up the discussion here. Is my understanding correct

that

if we have topic hierarchies then we can do without namespaces. To me
namespace is an abstraction, it can be implemented with topic

hierarchies

as well, would you agree? If so I guess topic hierarchies is the way to

go.

One more thing I would like to put forward as an advantage of having

topic

hierarchy is that we can support acls/ permissions inheritance in topic
hierarchies. This will avoid bootstrapping acls for each new topic.

On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com

javascript:;

wrote:


Thanks for your response.

I agree.  I think it would be useful to get consensus on how
namespaces and topic-hierarchies relate to one another.  To seed the
discussion - here's my viewpoint, which I hope others will challenge:

I see namespaces as something the broker provides to ensure that
two tenants can never interact with one another - even if,
for example, they both choose to use a topic called 'TOPIC1'.  I
imagine that this would be achieved by having the broker silently
add a per tenant prefix to the topic name in each request, and
strip it off in the response. So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is
to 'TENANT1-TOPIC1'.

If topic-hierarchies were available, then I think the prefix,
added/removed to implement namespaces, would be the first level of
qualification in the hierarchy.  So, for example, if 'TENANT1' sends
a message to 'TOPIC1', this would be re-written so that the send is

to

'TENANT1/TOPIC1'.

Extrapolating from what's currently in KIP-21 (dynamic configuration)
I guess that topic-hierarchies might 

[DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-05 Thread Jiangjie Qin
I am trying to see if we can reuse the NetworkClient class to be used in 
controller to broker communication. (Also, we can probably use KafkaConsumer 
which is already using NetworkClient in replica fetchers).
Currently NetworkClient does the following things in addition to sending 
requests.

  1.  Connection state management.
  2.  Flow control (inflight requests)
  3.  Metadata refresh

In controller we need (1) and (2) but not (3). NetworkClient is tightly coupled 
with metadata now and this is the major blocker of reusing NetworkClient in 
controller.  For controller, we don’t need NetworkClient to manage any metadata 
because the controller has listeners to monitor the cluster state and has all 
the information about topic metadata.
I am thinking we can add a disable metadata refresh flag to NetworkClient or 
set metadata refresh interval to be Long.MAX_VALUE, so the metadata will be 
managed outside NetworkClient.
This needs minimal change to allow NetworkClient to be reused, but the ugly 
part is NetworkClient still has the entire Metadata while it actually only 
needs a NodeList.

Want to see what do people think about this.

Thanks.

Jiangjie (Becket) Qin



Re: [DISCUSS] KIP-21 Configuration Management

2015-05-05 Thread Neha Narkhede
Joel, thanks for summarizing the requirements. It makes sense for the KIP
to focus on Req #1, unless any future configs as dynamic ones warrants a
completely different design. My main concern is going with a design by
keeping only quotas in mind and then continue shoehorning other dynamic
configs into that model even if it doesn't work that well.

1. In my earlier exchange with Jay, I mentioned the broker writing all it's
 configs to ZK (while respecting the overrides). Then ZK can be used to view
 all configs.


My concern with supporting both is that it will be confusing for anyone to
know where to look to find the final value of a config or be able to tell
if a particular broker hasn't picked up a config value. Maybe you have
thought about this, I am unclear about the fix you have in mind.

 2. Need to think about this a bit more. Perhaps we can discuss this during
 the hangout tomorrow?


This isn't relevant for LI but is important for a lot of users. So we
should definitely state how those tools would continue to work with this
change in the KIP.

3  4) I viewed these config changes as mainly administrative operations.
 In the case, it may be reasonable to assume that the ZK port is available
 for communication from the machine these commands are run.


I'm not so sure about this assumption.


 Having a ConfigChangeRequest (or similar) is nice to have but having a new
 API and sending requests to controller also change how we do topic based
 configuration currently. I was hoping to keep this KIP as minimal as
 possible and provide a means to represent and modify client and broker
 based configs in a central place. Are there any concerns if we tackle these
 things in a later KIP?


I don't have concerns about reducing the scope of this KIP as long as we
are sure the approach we pick is the right direction for dynamic config and
further changes are just incremental, not a redesign.

On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Good discussion. Since we will be talking about this at 11am, I wanted
 to organize these comments into requirements to see if we are all on
 the same page.

 REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
 be general enough to work for all configs that we envision may need to
 accept changes at runtime. e.g., log (topic), broker, client (quotas),
 etc.. possible options include:
 - ZooKeeper watcher
 - Kafka topic
 - Direct RPC to controller (or config coordinator)

 The current KIP is really focused on REQUIREMENT 1 and I think that is
 reasonable as long as we don't come up with something that requires
 significant re-engineering to support the other requirements.

 REQUIREMENT 2: Provide consistency of configs across brokers (modulo
 per-broker overrides) or at least be able to verify consistency.  What
 this effectively means is that config changes must be seen by all
 brokers eventually and we should be able to easily compare the full
 config of each broker.

 REQUIREMENT 3: Central config store. Needs to work with plain
 file-based configs and other systems (e.g., puppet). Ideally, should
 not bring in other dependencies (e.g., a DB). Possible options:
 - ZooKeeper
 - Kafka topic
 - other? E.g. making it pluggable?

 Any other requirements?

 Thanks,

 Joel

 On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote:
  Hey Neha,
 
  Thanks for the feedback.
  1. In my earlier exchange with Jay, I mentioned the broker writing all
 it's configs to ZK (while respecting the overrides). Then ZK can be used to
 view all configs.
 
  2. Need to think about this a bit more. Perhaps we can discuss this
 during the hangout tomorrow?
 
  3  4) I viewed these config changes as mainly administrative
 operations. In the case, it may be reasonable to assume that the ZK port is
 available for communication from the machine these commands are run. Having
 a ConfigChangeRequest (or similar) is nice to have but having a new API and
 sending requests to controller also change how we do topic based
 configuration currently. I was hoping to keep this KIP as minimal as
 possible and provide a means to represent and modify client and broker
 based configs in a central place. Are there any concerns if we tackle these
 things in a later KIP?
 
  Thanks,
  Aditya
  
  From: Neha Narkhede [n...@confluent.io]
  Sent: Sunday, May 03, 2015 9:48 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP-21 Configuration Management
 
  Thanks for starting this discussion, Aditya. Few questions/comments
 
  1. If you change the default values like it's mentioned in the KIP, do
 you
  also overwrite the local config file as part of updating the default
 value?
  If not, where does the admin look to find the default values, ZK or local
  Kafka config file? What if a config value is different in both places?
 
  2. I share Gwen's concern around making sure that popular config
 management
  tools continue to work 

Re: Adding multi-tenancy capabilities to Kafka

2015-05-05 Thread Jay Kreps
Hey guys,

It would be nice to get a design around this. Though there are currently so
many big things in flight I do wonder if we should start another parallel
thing...? But working out a design can't hurt.

Personally I think since one of the goals of Kafka is data integration we
really want to support a central cluster with many users. So I think we
really want to have full hierarchies with all users being part of the same
tree of streams. This way granting permissions on a new stream just means
changing permissions on what topic paths a client has access to rather
than needing a new client instance that connects to that separate namespace
(a la zookeeper). I think this will be a nice way to share config and acl
defaults too.

-Jay

On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com wrote:

 Adrian,

 Trying to follow up the discussion here. Is my understanding correct that
 if we have topic hierarchies then we can do without namespaces. To me
 namespace is an abstraction, it can be implemented with topic hierarchies
 as well, would you agree? If so I guess topic hierarchies is the way to go.

 One more thing I would like to put forward as an advantage of having topic
 hierarchy is that we can support acls/ permissions inheritance in topic
 hierarchies. This will avoid bootstrapping acls for each new topic.

 On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com
 wrote:

  Thanks for your response.
 
  I agree.  I think it would be useful to get consensus on how
  namespaces and topic-hierarchies relate to one another.  To seed the
  discussion - here's my viewpoint, which I hope others will challenge:
 
  I see namespaces as something the broker provides to ensure that
  two tenants can never interact with one another - even if,
  for example, they both choose to use a topic called 'TOPIC1'.  I
  imagine that this would be achieved by having the broker silently
  add a per tenant prefix to the topic name in each request, and
  strip it off in the response. So, for example, if 'TENANT1' sends
  a message to 'TOPIC1', this would be re-written so that the send is
  to 'TENANT1-TOPIC1'.
 
  If topic-hierarchies were available, then I think the prefix,
  added/removed to implement namespaces, would be the first level of
  qualification in the hierarchy.  So, for example, if 'TENANT1' sends
  a message to 'TOPIC1', this would be re-written so that the send is to
  'TENANT1/TOPIC1'.
 
  Extrapolating from what's currently in KIP-21 (dynamic configuration)
  I guess that topic-hierarchies might result in the possibility for
  even finer grain topic configuration - e.g. a ZNode structure of:
  '/config/topics/topic_level1/topic_level2/...'.  This would work
  best with an implementation of namespaces that was based on-top of
  topic-hierarchies, allowing configuration to be applied at the scope
  of: all tenants, one tenant, or one tenant's topics.
 
  So in summary: I think that namespaces can be usefully implemented
  independently of topic-hierarchies, and when topic-hierarchies are
  implemented would be easily integrated.
 
  Regards
  - Adrian
 
  -Gwen Shapira gshap...@cloudera.com wrote: -
  To: dev@kafka.apache.org dev@kafka.apache.org
  From: Gwen Shapira gshap...@cloudera.com
  Date: 04/28/2015 06:54PM
  Subject: Re: Adding multi-tenancy capabilities to Kafka
 
  I think recent discussion showed some need for topic namespaces - for
  example, Jun's use case for reserving topic names for specific users
  discussed under authorization.
 
  I think some discussion should happen on namespaces vs more full-fledged
  topic-hierarchy.
  I like the simplicity of namespaces, but there may be other requirements
  (such as inheriting configuration).
 
  Gwen
 
  On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com
  wrote:
 
   Hi all,
  
   I've been looking at how a Kafka cluster could be used be deployed so
  that
   it can be used by multiple tenants.  Specifically: a scheme where the
   clients belonging to each tenant receive the impression they are
  operating
   against their own cluster.  The ongoing security and quota work looks
  like
   it might provide a lot of the isolation requirements, but each tenant
  would
   currently share the same namespace for topics and consumer groups.  So
  the
   illusion of it is my own cluster is broken as soon as two tenants try
   independent to use the same topic name.
  
   I wondered if other people on this list are interested in being able to
   support multiple tenants in this way?  And / or if the ability to avoid
   clashes in the topic namespace would be useful?  I am considering
   submitting a KIP in this area - but first wanted to get a feeling for
   whether these kinds of capabilities are of interest to others.
  
   Thanks in advance,
   - Adrian
  
   Unless stated otherwise above:
   IBM United Kingdom Limited - Registered in England and Wales with
 number
   741598.
   Registered office: PO Box 

[jira] [Created] (KAFKA-2171) System Test for Quotas

2015-05-05 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-2171:
---

 Summary: System Test for Quotas
 Key: KAFKA-2171
 URL: https://issues.apache.org/jira/browse/KAFKA-2171
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dong Lin
Assignee: Dong Lin


Motivation and goal:
We want to make sure that following features are working properly for both 
consumer and producer: default quota, client-specific quota, and quota sharing 
per clientId.

The tests and configuration described aims to cover most of the scenarios. More 
test cases with varying configurations (e.g. ackNum) can be added if there is 
good reason to do so.

Initial setup and configuration:
In all scenarios, we first create kafka brokers and topic as follows:
- create two kafka broker processes (by default local)
- create a topic with replication factor = 2 and ackNum = -1
- let max_read = max_write = 5MB. The test machine is expected to provide read 
(write) throughput at least max_read (max_write).
- we consider two rates are approximately the same if they differ by at most 5%.

Scenario 1: Validate that max_read and max_write are provided by the test 
machine(s) using 1 producer and 1 consumer
1) produce data to the topic without rate limit for 30 seconds
2) record the rate of producer
3) then, consume data from the topic without rate limit until he finishes
4) record the rate of consumer
5) verify that the data consumed is identical to the data produced
6) verify that producer rate = max_write and consumer rate = max_read

Scenario 2: validate the effectiveness of default write and read quota using 1 
producer and 1 consumer
1) configure brokers to use max_write/2 as default write quota and max_read/2 
as default read quota
2) produce data to the topic for 30 seconds
3) record the rate of producer
4) then, consume data from the topic until he finishes
5) record the rate of consumer
6) verify that the data consumed is identical to the data produced
7) verify that recorded write (read) rate is within 5% of max_write/2 
(max_read/2).

Scenario 3: validate the effectiveness of client-specific write and read quota 
using 2 producers and 2 consumers
1) configure brokers to use max_write/2 as default write quota and max_read/2 
as default read quota; configure brokers to use max_write/4 for producer_2 and 
max_read/4 for consumer_2
2) both producers produce data to the topic for 30 seconds. They use different 
clientId.
3) record the rate of producer
4) both consumers consume data from the topic until they finish. They use 
different clientId and groupId.
5) record the rate of consumer
6) verify that the data consumed is identical to the data produced
7) verify that producer_1 and producer_2 rates are approximately max_write/2 
and max_write/4; verify that consumer_1 and consumer_2 rates are approximately 
max_read/2 and max_read/4.

Scenario 4: validate the effectiveness of write and read quota sharing among 
clients of same clientId using 2 producers and 2 consumers.
1) configure brokers to use max_write/2 as default write quota and max_read/2 
as default read quota
2) both producers produce data to the topic for 30 seconds. They use same 
clientId.
3) record the rate of producer
4) both consumers consume data from the topic until they finish. They use same 
clientId but different groupId.
5) record the rate of consumer
6) verify that the data consumed is identical to the data produced
7) verify that total rate of producer_1 and producer_2 is approximately 
max_write/2; verify that total rate of consumer_1 and consumer_2 is 
approximately max_read/2.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33088: add heartbeat to coordinator

2015-05-05 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/
---

(Updated May 5, 2015, 5:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1334
https://issues.apache.org/jira/browse/KAFKA-1334


Repository: kafka


Description
---

add heartbeat to coordinator

todo:
- see how it performs under real load
- add error code handling on the consumer side


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
456b602245e111880e1b8b361319cabff38ee0e9 
  core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 
2f5797064d4131ecfc9d2750d9345a9fa3972a9a 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 
6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 
  core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 
df60cbc35d09937b4e9c737c67229889c69d8698 
  core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 
8defa2e41c92f1ebe255177679d275c70dae5b3e 
  core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala 
94ef5829b3a616c90018af1db7627bfe42e259e5 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 
821e26e97eaa97b5f4520474fff0fedbf406c82a 
  core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
b673e43b0ba401b2e22f27aef550e3ab0ef4323c 
  core/src/main/scala/kafka/server/KafkaApis.scala 
b4004aa3a1456d337199aa1245fb0ae61f6add46 
  core/src/main/scala/kafka/server/KafkaServer.scala 
c63f4ba9d622817ea8636d4e6135fba917ce085a 
  core/src/main/scala/kafka/server/OffsetManager.scala 
18680ce100f10035175cc0263ba7787ab0f6a17a 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33088/diff/


Testing
---


Thanks,

Onur Karaman



[jira] [Commented] (KAFKA-1334) Add failure detection capability to the coordinator / consumer

2015-05-05 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528896#comment-14528896
 ] 

Onur Karaman commented on KAFKA-1334:
-

Updated reviewboard https://reviews.apache.org/r/33088/diff/
 against branch origin/trunk

 Add failure detection capability to the coordinator / consumer
 --

 Key: KAFKA-1334
 URL: https://issues.apache.org/jira/browse/KAFKA-1334
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Affects Versions: 0.9.0
Reporter: Neha Narkhede
Assignee: Onur Karaman
 Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, 
 KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, 
 KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, 
 KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, 
 KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch


 1) Add coordinator discovery and failure detection to the consumer.
 2) Add failure detection capability to the coordinator when group management 
 is used.
 This will not include any rebalancing logic, just the logic to detect 
 consumer failures using session.timeout.ms. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.

2015-05-05 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529033#comment-14529033
 ] 

Ewen Cheslack-Postava commented on KAFKA-2103:
--

Agreed with [~becket_qin] that the test shouldn't be assuming things out of 
it's control. I made some changes to this test in KAFKA-1501 and part of the 
point of the patch that ultimately resolved 1501 is that it removes as many 
cases of a getNextUnusedPort() as possible. I'd discourage trying to use it 
here (one does exist in integration.kafka.api.FixedPortTestUtils, but note that 
it's under the *integration* package for a reason).

Some notes:

* I left a note when refactoring that using fixed values here was ok because 
things were mocked. I didn't notice that if you work through a couple layers of 
that code, it ends up parsing the broker list for getting metadata -- not all 
of the code is properly mocked for the unit test.
* Turns out that TestUtils.createBrokerConfig was broken -- it was not properly 
setting the port in the config. This seems to be the real issue for this 
particular test since I chose a broker port that is incredibly unlikely to be 
bound whereas before it was using the default 9092 which is probably commonly 
bound on developers' machines. I've submitted that patch as a temporary fix, 
but it doesn't really resolve the underlying issue that this unit test is 
actually trying to communicate with the server rather than using a mocked out 
implementation.
* I tried an approach where we can allocate ports but don't close them, use 
those ports numbers, and then during teardown clean it up. I think this is the 
only way to safely use real ports allocated dynamically. This works, but has to 
wait for the connections that it's trying to start to timeout. We could use 
FixedPortTestUtils' existing method, but that can run into the same issues we 
were trying to resolve in KAFKA-1501.

 kafka.producer.AsyncProducerTest failure.
 -

 Key: KAFKA-2103
 URL: https://issues.apache.org/jira/browse/KAFKA-2103
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jiangjie Qin
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-2103.patch


 I saw this test consistently failing on trunk.
 The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809.
 kafka.producer.AsyncProducerTest  testNoBroker FAILED
 org.scalatest.junit.JUnitTestFailedError: Should fail with 
 FailedToSendMessageException
 at 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
 at 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
 at org.scalatest.Assertions$class.fail(Assertions.scala:711)
 at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
 at 
 kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300)
 kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED
 kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED
 kafka.producer.AsyncProducerTest  testFailedSendRetryLogic FAILED
 kafka.common.FailedToSendMessageException: Failed to send messages after 
 3 tries.
 at 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
 at 
 kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33857: Patch for KAFKA-2103

2015-05-05 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33857/
---

Review request for kafka.


Bugs: KAFKA-2103
https://issues.apache.org/jira/browse/KAFKA-2103


Repository: kafka


Description
---

KAFKA-2103: Fix TestUtils.createBrokerConfig to include the port.


Diffs
-

  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
faae0e907596a16c47e8d49a82b6a3c82797c96d 

Diff: https://reviews.apache.org/r/33857/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.

2015-05-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2103:
-
Attachment: KAFKA-2103.patch

 kafka.producer.AsyncProducerTest failure.
 -

 Key: KAFKA-2103
 URL: https://issues.apache.org/jira/browse/KAFKA-2103
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jiangjie Qin
Assignee: Dong Lin
 Attachments: KAFKA-2103.patch


 I saw this test consistently failing on trunk.
 The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809.
 kafka.producer.AsyncProducerTest  testNoBroker FAILED
 org.scalatest.junit.JUnitTestFailedError: Should fail with 
 FailedToSendMessageException
 at 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
 at 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
 at org.scalatest.Assertions$class.fail(Assertions.scala:711)
 at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
 at 
 kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300)
 kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED
 kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED
 kafka.producer.AsyncProducerTest  testFailedSendRetryLogic FAILED
 kafka.common.FailedToSendMessageException: Failed to send messages after 
 3 tries.
 at 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
 at 
 kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.

2015-05-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2103:
-
Assignee: Ewen Cheslack-Postava  (was: Dong Lin)
  Status: Patch Available  (was: Open)

 kafka.producer.AsyncProducerTest failure.
 -

 Key: KAFKA-2103
 URL: https://issues.apache.org/jira/browse/KAFKA-2103
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jiangjie Qin
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-2103.patch


 I saw this test consistently failing on trunk.
 The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809.
 kafka.producer.AsyncProducerTest  testNoBroker FAILED
 org.scalatest.junit.JUnitTestFailedError: Should fail with 
 FailedToSendMessageException
 at 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
 at 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
 at org.scalatest.Assertions$class.fail(Assertions.scala:711)
 at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
 at 
 kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300)
 kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED
 kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED
 kafka.producer.AsyncProducerTest  testFailedSendRetryLogic FAILED
 kafka.common.FailedToSendMessageException: Failed to send messages after 
 3 tries.
 at 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
 at 
 kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.

2015-05-05 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529032#comment-14529032
 ] 

Ewen Cheslack-Postava commented on KAFKA-2103:
--

Created reviewboard https://reviews.apache.org/r/33857/diff/
 against branch origin/trunk

 kafka.producer.AsyncProducerTest failure.
 -

 Key: KAFKA-2103
 URL: https://issues.apache.org/jira/browse/KAFKA-2103
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jiangjie Qin
Assignee: Dong Lin
 Attachments: KAFKA-2103.patch


 I saw this test consistently failing on trunk.
 The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809.
 kafka.producer.AsyncProducerTest  testNoBroker FAILED
 org.scalatest.junit.JUnitTestFailedError: Should fail with 
 FailedToSendMessageException
 at 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
 at 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
 at org.scalatest.Assertions$class.fail(Assertions.scala:711)
 at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
 at 
 kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300)
 kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED
 kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED
 kafka.producer.AsyncProducerTest  testFailedSendRetryLogic FAILED
 kafka.common.FailedToSendMessageException: Failed to send messages after 
 3 tries.
 at 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
 at 
 kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client

2015-05-05 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529043#comment-14529043
 ] 

Sriharsha Chintalapani commented on KAFKA-1690:
---

[~junrao] [~jjkoshy] I got feedback from Joel. I'll update the patch add more 
tests in next 2 days. Thanks.

 new java producer needs ssl support as a client
 ---

 Key: KAFKA-1690
 URL: https://issues.apache.org/jira/browse/KAFKA-1690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.3

 Attachments: KAFKA-1690.patch, KAFKA-1690.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2156) Possibility to plug in custom MetricRegistry

2015-05-05 Thread Andras Sereny (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528203#comment-14528203
 ] 

Andras Sereny commented on KAFKA-2156:
--

Thanks Neha. This might already work with Kafka Metrics, as far as I can see: 
get the appropriate Sensors from the Metrics (this has to be accessible 
somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant 
(have to get all the necessary Sensors), but workable. Is there a timeline for 
converting to Kafka Metrics? Thanks, András 

 Possibility to plug in custom MetricRegistry
 

 Key: KAFKA-2156
 URL: https://issues.apache.org/jira/browse/KAFKA-2156
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.1.2
Reporter: Andras Sereny
Assignee: Jun Rao

 The trait KafkaMetricsGroup refers to Metrics.defaultRegistry() throughout. 
 It would be nice to be able to inject any MetricsRegistry instead of the 
 default one. 
 (My usecase is that I'd like to channel Kafka metrics into our application's 
 metrics system, for which I'd need custom implementations of 
 com.yammer.metrics.core.Metric.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows

2015-05-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529337#comment-14529337
 ] 

Jun Rao commented on KAFKA-2170:


We do have the logic in OffsetIndex.resize() to unmap before setting the file 
length, if the platform is windows. Could you check if that logic is actually 
triggered?

 10 LogTest cases failed for  file.renameTo failed under windows
 ---

 Key: KAFKA-2170
 URL: https://issues.apache.org/jira/browse/KAFKA-2170
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.9.0
 Environment: Windows
Reporter: Honghai Chen
Assignee: Jay Kreps

 get latest code from trunk, then run test 
 gradlew  -i core:test --tests kafka.log.LogTest
 Got 10 cases failed for same reason:
 kafka.common.KafkaStorageException: Failed to change the log file suffix from 
  to .deleted for log segment 0
   at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
   at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
   at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at kafka.log.Log.deleteOldSegments(Log.scala:514)
   at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
   at 
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
   at 
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
   at 
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
   at 
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
   at 
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
   at 
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
   at 
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
   at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
   at 
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
   at 
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
   at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
   at 
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
   at 
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
   at 
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
   at 
 org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
   at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
   at 
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   at 
 org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
   at 
 org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
   at $Proxy2.processTestClass(Unknown Source)
   at 
 org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
   at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
   at 
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   at 
 

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-05 Thread Jun Rao
Thanks. Could you updated the wiki? Also, commented on the jira.

Jun

On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com
wrote:

 Use config.segmentSize should be ok.   Previously add that one for make
 sure the file not exceed config.segmentSize, actually the function
 maybeRoll already make sure that.
 When try add test case for recover, blocked by the rename related issue,
 just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 ,
 any recommendation for fix that issue?

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Tuesday, May 5, 2015 12:51 PM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 This seems similar to what's in
 https://issues.apache.org/jira/browse/KAFKA-1065.

 Also, could you explain why the preallocated size is set to
 config.segmentSize
 - 2 * config.maxMessageSize, instead of just config.segmentSize?

 Thanks,

 Jun

 On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com
 wrote:

Hi guys,
  I'm trying add test cases, but below case crashed at line 
  segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea for
 it?
  Appreciate your help.
  The case assume kafka suddenly crash, and need recover the
  last segment.
 
  kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
  java.io.IOException: The requested operation cannot be performed
  on a file w ith a user-mapped section open
  at java.io.RandomAccessFile.setLength(Native Method)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
  ndex.scala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
  ala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
  ala:272)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
  at kafka.log.LogSegment.recover(LogSegment.scala:199)
  at
  kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
  st.scala:306)
 
def recover(maxMessageSize: Int): Int = {
  index.truncate()
  index.resize(index.maxIndexSize)
  var validBytes = 0
  var lastIndexEntry = 0
  val iter = log.iterator(maxMessageSize)
  try {
while(iter.hasNext) {
  val entry = iter.next
  entry.message.ensureValid()
  if(validBytes - lastIndexEntry  indexIntervalBytes) {
// we need to decompress the message, if required, to get
  the offset of the first uncompressed message
val startOffset =
  entry.message.compressionCodec match {
case NoCompressionCodec =
  entry.offset
case _ =
 
  ByteBufferMessageSet.deepIterator(entry.message).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
  }
  validBytes += MessageSet.entrySize(entry.message)
}
  } catch {
case e: InvalidMessageException =
  logger.warn(Found invalid messages in log segment %s at byte
  offset %d: %s..format(log.file.getAbsolutePath, validBytes,
 e.getMessage))
  }
  val truncated = log.sizeInBytes - validBytes
  log.truncateTo(validBytes)
  index.trimToValidSize()
  truncated
}
 
  /* create a segment with   pre allocate and Crash*/
@Test
def testCreateWithInitFileSizeCrash() {
  val tempDir = TestUtils.tempDir()
  val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime,
  false, 512*1024*1024, true)
 
  val ms = messages(50, hello, there)
  seg.append(50, ms)
  val ms2 = messages(60, alpha, beta)
  seg.append(60, ms2)
  val read = seg.read(startOffset = 55, maxSize = 200, maxOffset =
 None)
  assertEquals(ms2.toList, read.messageSet.toList)
  val oldSize = seg.log.sizeInBytes()
  val oldPosition = seg.log.channel.position
  val oldFileSize = seg.log.file.length
  assertEquals(512*1024*1024, oldFileSize)
  seg.flush()
  seg.log.channel.close()
  seg.index.close()
 
  val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0,
  SystemTime,
  true)
  segReopen.recover(64*1024)
  val size = segReopen.log.sizeInBytes()
  val position = segReopen.log.channel.position
  val fileSize = segReopen.log.file.length
  assertEquals(oldPosition, position)
  assertEquals(oldSize, size)
  assertEquals(size, fileSize)
}
 
 
 
  

[jira] [Comment Edited] (KAFKA-2156) Possibility to plug in custom MetricRegistry

2015-05-05 Thread Andras Sereny (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528203#comment-14528203
 ] 

Andras Sereny edited comment on KAFKA-2156 at 5/5/15 10:58 AM:
---

Thanks Neha. This might already work with Kafka Metrics, as far as I can see: 
get the appropriate Sensors from the Metrics (this has to be accessible 
somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant 
(have to get all the necessary Sensors), but workable. Is there a timeline for 
migarting to Kafka Metrics? Thanks, András 


was (Author: sandris):
Thanks Neha. This might already work with Kafka Metrics, as far as I can see: 
get the appropriate Sensors from the Metrics (this has to be accessible 
somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant 
(have to get all the necessary Sensors), but workable. Is there a timeline for 
converting to Kafka Metrics? Thanks, András 

 Possibility to plug in custom MetricRegistry
 

 Key: KAFKA-2156
 URL: https://issues.apache.org/jira/browse/KAFKA-2156
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.1.2
Reporter: Andras Sereny
Assignee: Jun Rao

 The trait KafkaMetricsGroup refers to Metrics.defaultRegistry() throughout. 
 It would be nice to be able to inject any MetricsRegistry instead of the 
 default one. 
 (My usecase is that I'd like to channel Kafka metrics into our application's 
 metrics system, for which I'd need custom implementations of 
 com.yammer.metrics.core.Metric.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)