Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
Hi Jun, On Sat, May 2, 2015 at 2:50 PM, Jun Rao j...@confluent.io wrote: We will also need to figure out if we need CONTRIBUTING.md like the following to take care of the Apache licensing stuff. https://github.com/apache/spark/blob/master/CONTRIBUTING.md Yes indeed. That is in step 3 of the missing pieces in the original email. As for merging changes, I think squashing the commits will be ideal during merge. The script does squash commits during merge indeed. It also includes retains a bunch of useful information in the commit message. I think one loses some of the benefits of git when using a one squashed commit per PR approach, but that is a separate issue and I am not suggesting a change in this regard at this point. Also, it would be great if we can always put the latest changes on top during the merge. Just to make sure I understand correctly, do you mean updating the working copy with the latest from trunk before merging? Or did I misunderstand? Best, Ismael
Re: Kafka KIP hangout May 5
Is there information on how to get into the hangout? Or is it by invitation? I'm very interested in the authorization changes. Thanks,Tom On Monday, May 4, 2015 9:20 AM, Jun Rao j...@confluent.io wrote: Hi, Everyone, We will have a KIP hangout at 11 PST on May 5. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-4 (admin commands): any remaining issues KIP-11 (authorization): any remaining issues KIP-21 (configuration management) Thanks, Jun
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn’t find any other classes using it. JMX reporter overrides it but doesn’t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user sends a partition id than it will applied and partitioner.class method will only called if partition id is null . Please take a look at the updated KIP here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer . Let me know if you see anything missing. Thanks, Harsha On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote: Hi, Here are the questions I think we should consider: 1. Do we need this at all given that we have the partition argument in ProducerRecord which gives full control? I think we do need it because this is a way to plug in a different partitioning strategy at run time and do it in a fairly transparent way. Yes, we need it if we want to support different partitioning strategies inside Kafka rather than requiring the user to code them externally. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. I am not entirely sure about this. I guess that most partitioners should not use it. I think it makes it easier to reason about the system if the partitioner only works on the key. Hoever, if the value (and its serialization) are already available, there is not much harm in passing them along. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? Right now the only application I can think of to have an init() and close() is to read some state information (e.g., load information) that is published on some external distributed storage (e.g., zookeeper) by the brokers. It might be useful also for reconfiguration and state migration. I think it's not a very common use case right now, but if the added complexity is not too much it might be worth to have support for these methods. 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). The user should be able to override the partitioner on a per-record basis by explicitly setting the partition id. I don't think it makes sense for the partitioners to take hints on the partition. I would even go the extra step, and have a default logic that accepts both key and partition id (current interface) and calls partition() only if the partition id is not set. The partition() method does *not* take the partition ID as input (only key-value). Cheers, -- Gianmarco Cheers, -Jay On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Here is the KIP for adding a partitioner interface for producer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer There is one open question about how interface should look like. Please take a look and let me know if you prefer one way or the other. Thanks, Harsha
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
Hi Ewen, Comments inline. On Fri, May 1, 2015 at 8:38 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Also +1. There are some drawbacks to using Github for reviews, e.g. lots of emails for each review because they don't let you publish your entire review in one go like RB does, but it drastically lowers the barrier to contributing for most developers. Also, if you haven't tried it, hub https://hub.github.com/ makes it really easy to checkout and test PRs. Good points. One thing I noticed is that when you try to generate a PR it defaults to the 0.8.2 branch. Can we fix that up to be trunk by default? That's the most common use case; version branches are really only useful when a release is being prepared. Do changes to the Github repo require tickets to the Apache Infra team or is this something committers have control over? It can be changed to trunk, but it does require a ticket to the Infra team[1]. Sadly, PRs can only be closed via commits, PR creator or a ticket to Infra. This is quite clunky for abandoned PRs (a label can be used to exclude such PRs from view, if needed). On a related note, which perhaps should be discussed on another thread: The CI setup is a related issue that we might want to rethink. I did think about Travis as well, but I thought that should be considered as a subsequent and separate discussion indeed. Best, Ismael [1] An example of such a request: https://issues.apache.org/jira/browse/INFRA-9208
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Just a quick question, can we handle REQUEST TIMEOUT as disconnections and do a fresh MetaDataRequest and retry instead of failing the request? Thanks, Mayuresh On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected
Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata. As for replica fetchers, it may be possible to use KafkaConsumer. However, we don't need the metadata and the offset management. So, perhaps it's easier to just use NetworkClient. Also, currently, there is one replica fetcher thread per source broker. By using NetworkClient, we can change that to using a single thread for all source brokers. This is probably a bigger change. So, maybe we can do it later. Jun I think we probably need to replace replica fetcher with NetworkClient as well. Replica fetcher gets leader from the controller and therefore doesn't On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I am trying to see if we can reuse the NetworkClient class to be used in controller to broker communication. (Also, we can probably use KafkaConsumer which is already using NetworkClient in replica fetchers). Currently NetworkClient does the following things in addition to sending requests. 1. Connection state management. 2. Flow control (inflight requests) 3. Metadata refresh In controller we need (1) and (2) but not (3). NetworkClient is tightly coupled with metadata now and this is the major blocker of reusing NetworkClient in controller. For controller, we don’t need NetworkClient to manage any metadata because the controller has listeners to monitor the cluster state and has all the information about topic metadata. I am thinking we can add a disable metadata refresh flag to NetworkClient or set metadata refresh interval to be Long.MAX_VALUE, so the metadata will be managed outside NetworkClient. This needs minimal change to allow NetworkClient to be reused, but the ugly part is NetworkClient still has the entire Metadata while it actually only needs a NodeList. Want to see what do people think about this. Thanks. Jiangjie (Becket) Qin
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated May 6, 2015, 12:52 a.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Sample usage in ReplicaManager Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. I've not yet added integration testcases testing the consumer delays.. will update the patch once those are ready Incorporated Jun's comments Adding javadoc KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics Adding more configs Diffs (updated) - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala b7d2a2842e17411a823b93bdedc84657cbd62be1 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529672#comment-14529672 ] Aditya A Auradkar commented on KAFKA-2084: -- Updated reviewboard https://reviews.apache.org/r/33049/diff/ against branch origin/trunk byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, KAFKA-2084_2015-05-05_17:52:02.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2084: - Attachment: KAFKA-2084_2015-05-05_17:52:02.patch byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, KAFKA-2084_2015-05-05_17:52:02.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529694#comment-14529694 ] Honghai Chen commented on KAFKA-1646: - When trying add test case for Log, got test failures for existing test cases in windows. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, KAFKA-1646_20150422.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529694#comment-14529694 ] Honghai Chen edited comment on KAFKA-1646 at 5/6/15 1:13 AM: - When trying add test case for Log, got test failures for existing test cases in windows. https://issues.apache.org/jira/browse/KAFKA-2170 was (Author: waldenchen): When trying add test case for Log, got test failures for existing test cases in windows. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, KAFKA-1646_20150422.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka KIP hangout May 5
Below are my notes from today’s KIP hangout. Please feel free to add/ correct the content. Kafka KIP Discussion (May 5) KIP-4 (admin commands): any remaining issues - Only issue left is how to handle multiple instructions for one topic. Suggestion here was to silently ignore duplicate instructions, use map with topic as key and execute the last one. - Andrii will update the KIP KIP-11 (authorization): any remaining issues - Parth added the diagrams asked by Joe - Does curent KIP prevents us from adding SAML? SAML can be added as another port for authentication. Getting roles and groups for SAML will require SAML specific authentication implementation. Session object can be used to capture required information. - Is current interface, authorizer, bound with ACLs? Based on discussion, it does not look like. If some implementation decides not to use ACLs at all, it can ignore all other methods and implement just the authorize method. ACLs is in the interface as acls cli is a requirement for KAFKA community. - Authorizer interface does not have to change for different implementations, session implementation, using session builder, should take care of what goes into session. - Harsha is right now picking up authorized id, a string, based on client’s principal name in SASL implementation and get peer principal in ssl session for authentication, can add subject as part of the session. However one can use session builder to add whatever information to session and use it for authentication. Just get principal in ssl session is not enough for Linkedin. Harsh will add comment to the code to make it a clear. - Joel had some concerns on deny taking precedence in authorization. - If no authorizer is not configured, security code path won’t be exercised at all. If authorizer is configured, by default topics without acls will be disallowed for every user except for superusers. Admin will have to set a config parameter to change the default behaviour and allow anyone to be able to access topics that do not have any acls on them. KIP-21 (configuration management) - To have the dynamic configuration mechanism to pick changes, do we need multiple backends, like configs in ZK, config file, configs in topic, etc? This might lead to multiple sources of truth, usually not a good thing to have. - If multiple backends are required, do they need to be active at the same time. - People in general were concerned that existing tools like puppet, etc. might not work if we move to zk based backend for configs. - One suggestion was to have users/admins update the current config file and support a reloadConfig command that would make broker to re-read the configs file. - Plan is to have more discussions around this on the discuss mail chain. On Mon, May 4, 2015 at 7:19 AM, Jun Rao j...@confluent.io wrote: Hi, Everyone, We will have a KIP hangout at 11 PST on May 5. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-4 (admin commands): any remaining issues KIP-11 (authorization): any remaining issues KIP-21 (configuration management) Thanks, Jun -- Regards, Ashish
Re: [DISCUSS] KIP-21 Configuration Management
Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes must be seen by all brokers eventually and we should be able to easily compare the full config of each broker. REQUIREMENT 3: Central config store. Needs to work with plain file-based configs and other systems (e.g., puppet). Ideally, should not bring in other dependencies (e.g., a DB). Possible options: - ZooKeeper - Kafka topic - other? E.g. making it pluggable? Any other requirements? Thanks, Joel On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote: Hey Neha, Thanks for the feedback. 1. In my earlier exchange with Jay, I mentioned the broker writing all it's configs to ZK (while respecting the overrides). Then ZK can be used to view all configs. 2. Need to think about this a bit more. Perhaps we can discuss this during the hangout tomorrow? 3 4) I viewed these config changes as mainly administrative operations. In the case, it may be reasonable to assume that the ZK port is available for communication from the machine these commands are run. Having a ConfigChangeRequest (or similar) is nice to have but having a new API and sending requests to controller also change how we do topic based configuration currently. I was hoping to keep this KIP as minimal as possible and provide a means to represent and modify client and broker based configs in a central place. Are there any concerns if we tackle these things in a later KIP? Thanks, Aditya From: Neha Narkhede [n...@confluent.io] Sent: Sunday, May 03, 2015 9:48 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-21 Configuration Management Thanks for starting this discussion, Aditya. Few questions/comments 1. If you change the default values like it's mentioned in the KIP, do you also overwrite the local config file as part of updating the default value? If not, where does the admin look to find the default values, ZK or local Kafka config file? What if a config value is different in both places? 2. I share Gwen's concern around making sure that popular config management tools continue to work with this change. Would love to see how each of those would work with the proposal in the KIP. I don't know enough about each of the tools but seems like in some of the tools, you have to define some sort of class with parameter names as config names. How will such tools find out about the config values? In Puppet, if this means that each Puppet agent has to read it from ZK, this means the ZK port has to be open to pretty much every machine in the DC. This is a bummer and a very confusing requirement. Not sure if this is really a problem or not (each of those tools might behave differently), though pointing out that this is something worth paying attention to. 3. The wrapper tools that let users read/change config tools should not depend on ZK for the reason mentioned above. It's a pain to assume that the ZK port is open from any machine that needs to run this tool. Ideally what users want is a REST API to the brokers to change or read the config (ala Elasticsearch), but in the absence of the REST API, we should think if we can write the tool such that it just requires talking to the Kafka broker port. This will require a config RPC. 4. Not sure if KIP is the right place to discuss the design of propagating the config changes to the brokers, but have you
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Guys, I've updated the wiki to reflect all previously discussed items (regarding the schema only - this is included to phase 1). https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations I think we can have the final discussion today (for phase 1) and in case no new remarks I will start the voting thread. With regards to AlterTopicRequest semantics. I agree with Jun, and I think it's my bad I focused on multiple topics in one request. The same situation is possible in ProduceRequest, Fetch, TopicMetadata and we handle it naturally and in the most transparent way - we put all separate instructions into a map and thus silently ignore duplicates. This also makes Response part simple too - it's just a map Topic-ErrorCode. I think we need to follow the same approach for Alter (and Create, Delete) request. With this we add nothing new in terms of batch requests semantics. Thanks, Andrii Biletskyi On Thu, Apr 30, 2015 at 4:31 PM, Jun Rao j...@confluent.io wrote: The following is a description of some of my concerns on allowing the same topic multiple times in AlterTopicRequest. ATP has an array of entries, each corresponding to a topic. We allow multiple changes to a topic in a single entry. Those changes may fail to apply independently (e.g., the config change may succeed, but the replica assignment change may fail). If there is an issue applying one of the changes, we will set an error code for that entry in the response. If we allow the same topic to be specified multiple times in ATR, it can happen that the first entry succeeds, but the second entry fails partially. Now, from the admin's perspective, it's a bit hard to do the verification. Ideally, you want to wait for the changes in the first entry to be applied. However, the second entry may have part of the changes applied successfully. About putting restrictions on the requests. Currently, we effectively expect a topic-partition to be only specified once in the FetchRequest. Allowing the same topic-partition to be specified multiple times in FetchRequest will be confusing and complicates the implementation (e.g., putting the request in purgatory). A few other requests probably have similar implicit assumptions on topic or topic-partition being unique in each request. Thanks, Jun On Tue, Apr 28, 2015 at 5:26 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, A quick summary of our today's meeting. There were no additional issues/questions. The only item about which we are not 100% sure is multiple instructions for one topic in one request case. It was proposed by Jun to explain reasons behind not allowing users doing that again here in mailing list, and in case we implement it in final version document it well so API clients understand what exactly is not allowed and why. At the meantime I will update the KIP. After that I will start voting thread. Thanks, Andrii Biletskyi On Tue, Apr 28, 2015 at 10:33 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, It seems that there are no open questions left so prior to our weekly call let me summarize what I'm going to implement as part of phase one for KIP-4. 1. Add 3 new Wire Protocol requests - Create-, Alter- and DeleteTopicRequest 2. Topic requests are batch requests, errors are returned per topic as part of batch response. 3. Topic requests are asynchronous - respective commands are only started and server is not blocked until command is finished. 4. It will be not allowed to specify multiple mutations for the same topic in scope of one batch request - a special error will be returned for such topic. 5. There will be no dedicated request for reassign-partitions - it is simulated with AlterTopicRequest.ReplicaAssignment field. 6. Preferred-replica-leader-election is not supported since there is no need to have a public API to trigger such operation. 7. TopicMetadataReqeust will be evolved to version 1 - topic-level configuration per topic will be included and ISR field will be removed. Automatic topic-creation logic will be removed (we will use CreateTopicRequest for that). Thanks, Andrii Biletskyi On Tue, Apr 28, 2015 at 12:23 AM, Jun Rao j...@confluent.io wrote: Yes, to verify if a partition reassignment completes or not, we just need to make sure AR == RAR. So, we don't need ISR for this. It's probably still useful to know ISR for monitoring in general though. Thanks, Jun On Mon, Apr 27, 2015 at 4:15 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Okay, I had some doubts in terms of reassign-partitions case. I was not sure whether we need ISR to check post condition of partition reassignment. But I think we can rely on assigned replicas - the workflow in
[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529422#comment-14529422 ] Aditya A Auradkar commented on KAFKA-2084: -- Updated reviewboard https://reviews.apache.org/r/33049/diff/ against branch origin/trunk byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated May 5, 2015, 10:27 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Sample usage in ReplicaManager Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. I've not yet added integration testcases testing the consumer delays.. will update the patch once those are ready Incorporated Jun's comments Adding javadoc KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics Diffs (updated) - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala b7d2a2842e17411a823b93bdedc84657cbd62be1 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated May 5, 2015, 10:29 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. I've not yet added integration testcases testing the consumer delays.. will update the patch once those are ready Incorporated Jun's comments KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics Diffs - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala b7d2a2842e17411a823b93bdedc84657cbd62be1 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2084: - Attachment: KAFKA-2084_2015-05-05_15:27:35.patch byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2172) Round-robin partition assignment strategy too restrictive
Jason Rosenberg created KAFKA-2172: -- Summary: Round-robin partition assignment strategy too restrictive Key: KAFKA-2172 URL: https://issues.apache.org/jira/browse/KAFKA-2172 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg The round-ropin partition assignment strategy, was introduced for the high-level consumer, starting with 0.8.2.1. This appears to be a very attractive feature, but it has an unfortunate restriction, which prevents it from being easily utilized. That is that it requires all consumers in the consumer group have identical topic regex selectors, and that they have the same number of consumer threads. It turns out this is not always the case for our deployments. It's not unusual to run multiple consumers within a single process (with different topic selectors), or we might have multiple processes dedicated for different topic subsets. Agreed, we could change these to have separate group ids for each sub topic selector (but unfortunately, that's easier said than done). In several cases, we do at least have separate client.ids set for each sub-consumer, so it would be incrementally better if we could at least loosen the requirement such that each set of topics selected by a groupid/clientid pair are the same. But, if we want to do a rolling restart for a new version of a consumer config, the cluster will likely be in a state where it's not possible to have a single config until the full rolling restart completes across all nodes. This results in a consumer outage while the rolling restart is happening. Finally, it's especially problematic if we want to canary a new version for a period before rolling to the whole cluster. I'm not sure why this restriction should exist (as it obviously does not exist for the 'range' assignment strategy). It seems it could be made to work reasonably well with heterogenous topic selection and heterogenous thread counts. The documentation states that The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the assignor can lay out all the available partitions and all the available consumer threads, it should be able to uniformly assign partitions to the available threads. In each case, if a thread belongs to a consumer that doesn't have that partition selected, just move to the next available thread that does have the selection, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2172) Round-robin partition assignment strategy too restrictive
[ https://issues.apache.org/jira/browse/KAFKA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529436#comment-14529436 ] Jason Rosenberg commented on KAFKA-2172: as a side note, the documentation here: http://kafka.apache.org/documentation.html#consumerconfigs currently has duplicate entries for 'partition.assignment.strategy', one of which is far more in depth than the other! Round-robin partition assignment strategy too restrictive - Key: KAFKA-2172 URL: https://issues.apache.org/jira/browse/KAFKA-2172 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg The round-ropin partition assignment strategy, was introduced for the high-level consumer, starting with 0.8.2.1. This appears to be a very attractive feature, but it has an unfortunate restriction, which prevents it from being easily utilized. That is that it requires all consumers in the consumer group have identical topic regex selectors, and that they have the same number of consumer threads. It turns out this is not always the case for our deployments. It's not unusual to run multiple consumers within a single process (with different topic selectors), or we might have multiple processes dedicated for different topic subsets. Agreed, we could change these to have separate group ids for each sub topic selector (but unfortunately, that's easier said than done). In several cases, we do at least have separate client.ids set for each sub-consumer, so it would be incrementally better if we could at least loosen the requirement such that each set of topics selected by a groupid/clientid pair are the same. But, if we want to do a rolling restart for a new version of a consumer config, the cluster will likely be in a state where it's not possible to have a single config until the full rolling restart completes across all nodes. This results in a consumer outage while the rolling restart is happening. Finally, it's especially problematic if we want to canary a new version for a period before rolling to the whole cluster. I'm not sure why this restriction should exist (as it obviously does not exist for the 'range' assignment strategy). It seems it could be made to work reasonably well with heterogenous topic selection and heterogenous thread counts. The documentation states that The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the assignor can lay out all the available partitions and all the available consumer threads, it should be able to uniformly assign partitions to the available threads. In each case, if a thread belongs to a consumer that doesn't have that partition selected, just move to the next available thread that does have the selection, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[VOTE] KIP-4 Admin Commands / Phase-1
Hi all, This is a voting thread for KIP-4 Phase-1. It will include Wire protocol changes and server side handling code. https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations Thanks, Andrii Biletskyi
Re: Review Request 33088: add heartbeat to coordinator
On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 281-355 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line281 If we add the CoordinatorMetadata class, these functions can all be moved to that class. I made a CoordinatorMetadata that only manages groups and keeps track of which topics to listen to changes for. It delegates as much group and consumer logic as possible to the ConsumerCoordinator (stuff like group synchronization, adding/removing/updating consumers and the corresponding topic bind/unbind operations), as I wanted to keep all the group logic in one place (ConsumerCoordinator). On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 463 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line463 Let's call finishCurrentHeartbeat upon receiving the join group request (line 155 above), and only expectNextHeartbeat here. Because otherwise if the rebalance is waiting long then this consumer will be unnecessarily marked as dead right? Just to make sure I understand your concern, is your concern that: 1. we scheduleHeartbeatExpiration for a consumer c in group g 2. g for some reason transitions to PreparingRebalance 3. c rejoins and is waiting for the rebalance to complete. 4. PreparingRebalance is taking a long time. The DelayedHeartbeat expires. c is marked dead. 5. g rebalances and stabilizes without c? If this is your concern, it won't happen. Expired DelayedHeartbeats won't mark rejoined consumers that are awaiting rebalance as dead due to the following concept from the [wiki](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Heartbeatsduringrebalance): The co-ordinator pauses failure detection for a consumer that has sent a JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead if it does not receive a HeartbeatRequest from that time for another session.timeout.ms milliseconds. I implemented this pause by making sure an expired DelayedHeartbeat on c won't mark c as dead if c is awaitingRebalance (line 353). On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 499 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line499 I think once we split the scheduleHeartbeat Expiration function, we do not need to check here as after the join-group is received but before rebalance is finished, we will not expect any heartbeat anymore. It's true that failure detection will pause during rebalance. But as stated above, some sort of check is still needed in onExpirationHeartbeat to prevent DelayedHeartbeats that were scheduled before rebalance and expired during rebalance from marking rejoined consumers as dead regardless of how that scheduleHeartbeatExpiration method is split up. On April 28, 2015, 12:13 a.m., Onur Karaman wrote: Some general comments: 1. Could you add some step-by-step and if-else comments in coordinator's functions to make it more self-explanary? 2. Note that onCompleteXXX functions will be called from purgatory repear threads, which will in turn call other internal functions like rebalance(), while other times these functions will be called by the request handler threads. Are there any possible deadlock issues? I ask for another thought-through just because we have been bite-on before due to this fact. For 2: I see two types of resources here: group locks and the metadataLock. We either only acquire a group lock, only acquire a metadataLock, or nest the metadataLock inside a group lock. We never nest group locks, so we don't get circular waits between group locks. We also never nest a group lock inside the metadataLock, so we don't get circular waits between group locks and the metadataLock. ZkClient doesn't cause any circular waits because internally, it just puts all ZkEvents into a queue and has a dedicated thread processing each event's corresponding TopicPartitionChangeListener handleDataChange or handleDataDeleted callbacks one by one, and these callbacks adhere to the locking patterns above. Since there doesn't seem to be any circular waits, it looks like there are no deadlocks. - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review81743 --- On May 5, 2015, 5:50 p.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/
[jira] [Updated] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list
[ https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2160: --- Reviewer: Jun Rao DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list --- Key: KAFKA-2160 URL: https://issues.apache.org/jira/browse/KAFKA-2160 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1387) Kafka getting stuck creating ephemeral node it has already created when two zookeeper sessions are established in a very short period of time
[ https://issues.apache.org/jira/browse/KAFKA-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1387: --- Reviewer: Jun Rao Kafka getting stuck creating ephemeral node it has already created when two zookeeper sessions are established in a very short period of time - Key: KAFKA-1387 URL: https://issues.apache.org/jira/browse/KAFKA-1387 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Fedor Korotkiy Priority: Blocker Labels: newbie, patch, zkclient-problems Attachments: kafka-1387.patch Kafka broker re-registers itself in zookeeper every time handleNewSession() callback is invoked. https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaHealthcheck.scala Now imagine the following sequence of events. 1) Zookeeper session reestablishes. handleNewSession() callback is queued by the zkClient, but not invoked yet. 2) Zookeeper session reestablishes again, queueing callback second time. 3) First callback is invoked, creating /broker/[id] ephemeral path. 4) Second callback is invoked and it tries to create /broker/[id] path using createEphemeralPathExpectConflictHandleZKBug() function. But the path is already exists, so createEphemeralPathExpectConflictHandleZKBug() is getting stuck in the infinite loop. Seems like controller election code have the same issue. I'am able to reproduce this issue on the 0.8.1 branch from github using the following configs. # zookeeper tickTime=10 dataDir=/tmp/zk/ clientPort=2101 maxClientCnxns=0 # kafka broker.id=1 log.dir=/tmp/kafka zookeeper.connect=localhost:2101 zookeeper.connection.timeout.ms=100 zookeeper.sessiontimeout.ms=100 Just start kafka and zookeeper and then pause zookeeper several times using Ctrl-Z. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2165: --- Reviewer: Jun Rao ReplicaFetcherThread: data loss on unknown exception Key: KAFKA-2165 URL: https://issues.apache.org/jira/browse/KAFKA-2165 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2165.patch Sometimes in our cluster some replica gets out of the isr. Then broker redownloads the partition from the beginning. We got the following messages in logs: {code} # The leader: [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when processing fetch request for partition [topic,11] offset 54369274 from follower with correlation id 2634499. Possible cause: Request for offset 54369274 but we only have log segments in the range 49322124 to 54369273. (kafka.server.ReplicaManager) {code} {code} # The follower: [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occures because we update fetchOffset [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] and then try to process message. If any exception except OffsetOutOfRangeCode occures we get unsynchronized fetchOffset and replica.logEndOffset. On next fetch iteration we can get fetchOffsetreplica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1690: --- Reviewer: Jun Rao new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1928: --- Reviewer: Jay Kreps Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2161) Fix a few copyrights
[ https://issues.apache.org/jira/browse/KAFKA-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2161: --- Reviewer: Joe Stein Fix a few copyrights Key: KAFKA-2161 URL: https://issues.apache.org/jira/browse/KAFKA-2161 Project: Kafka Issue Type: Bug Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Trivial Attachments: KAFKA-2161.patch I noticed that I accidentally let some incorrect copyright headers slip in with the KAKFA-1501 patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33760: Patch for KAFKA-2121
On May 4, 2015, 5:31 p.m., Neha Narkhede wrote: clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java, line 25 https://reviews.apache.org/r/33760/diff/2/?file=947511#file947511line25 This and also the Deserializer should extend Configurable too right? We need to pass in the info of whether the serializer/deserializer is for key or value during configuration time. That's why we choose not to use the general Configurable interface. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/#review82399 --- On May 1, 2015, 10:42 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- (Updated May 1, 2015, 10:42 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close missing copyright header in previous checkin remvoed throws Exception for test methods Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-2129) Consumer could make multiple concurrent metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2129: --- Reviewer: Guozhang Wang Consumer could make multiple concurrent metadata requests - Key: KAFKA-2129 URL: https://issues.apache.org/jira/browse/KAFKA-2129 Project: Kafka Issue Type: Bug Components: clients Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2129.patch The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. This protects against multiple metadata requests being made and is read on poll() on the NetworkClient. It is written to when a request is initiated. This is fine for the producer. Which seems to have one thread writing. The KafkaConsumer's poll() method is synchronized, so there will not be more than one writer entering from there. However, the NetworkClient's poll() method is also accessed on the Consumer's partitionsFor() method. Which could be access by a separate thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-21 Configuration Management
Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes must be seen by all brokers eventually and we should be able to easily compare the full config of each broker. REQUIREMENT 3: Central config store. Needs to work with plain file-based configs and other systems (e.g., puppet). Ideally, should not bring in other dependencies (e.g., a DB). Possible options: - ZooKeeper - Kafka topic - other? E.g. making it pluggable? Any other requirements? Thanks, Joel On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote: Hey Neha, Thanks for the feedback. 1. In my earlier exchange with Jay, I mentioned the broker writing all it's configs to ZK (while respecting the overrides). Then ZK can be used to view all configs. 2. Need to think about this a bit more. Perhaps we can discuss this during the hangout tomorrow? 3 4) I viewed these config changes as mainly administrative operations. In the case, it may be reasonable to assume that the ZK port is available for communication from the machine these commands are run. Having a ConfigChangeRequest (or similar) is nice to have but having a new API and sending requests to controller also change how we do topic based configuration currently. I was hoping to keep this KIP as minimal as possible and provide a means to represent and modify client and broker based configs in a central place. Are there any concerns if we tackle these things in a later KIP? Thanks, Aditya From: Neha Narkhede [n...@confluent.io] Sent: Sunday, May 03, 2015 9:48 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-21 Configuration Management Thanks for starting this discussion, Aditya. Few questions/comments 1. If you change the default values like it's mentioned in the KIP, do you also overwrite the local config file as part of updating the default value? If not, where does the admin look to find the default values, ZK or local Kafka config file? What if a config value is different in both places? 2. I share Gwen's concern around making sure that popular config management tools continue to work with this change. Would love to see how each of those would work with the proposal in the KIP. I don't know enough about each of the tools but seems like in some of the tools, you have to define some sort of class with parameter names as config names. How will such tools find out about the config values? In Puppet, if this means that each Puppet agent has to read it from ZK, this means the ZK port has to be open to pretty much every machine in the DC. This is a bummer and a very confusing requirement. Not sure if this is really a problem or not (each of those tools might behave differently), though pointing out that this is something worth paying attention to. 3. The wrapper tools that let users read/change config tools should not depend on ZK for the reason mentioned above. It's a pain to assume that the ZK port is open from any machine that needs to run this tool. Ideally what users want is a REST API to the brokers to change or read the config (ala Elasticsearch), but in the absence of the REST API, we should think if we can write the tool such that it just requires talking to the Kafka broker port. This will require a config RPC. 4. Not sure if KIP is the right place to discuss the design of propagating the config changes to the brokers, but have you thought about just letting the controller oversee the config changes and propagate via RPC to the brokers? That way, there is an easier way to express config changes that require all brokers to change it for it to be called complete. Maybe this is not required, but it is hard to say if we don't discuss the full set of configs that need to be dynamic. Thanks, Neha On Fri, May 1, 2015 at 12:53 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, This is a great! A couple of comments: 1. Leaving the file config in place is definitely the least disturbance. But let's really think about getting rid of the files and just have one config mechanism. There is always a tendency to make everything pluggable
[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2123: --- Reviewer: Jay Kreps Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2121: --- Reviewer: Guozhang Wang prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch, KAFKA-2121_2015-05-01_15:42:30.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(...,
Re: Adding multi-tenancy capabilities to Kafka
Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might result in the possibility for even finer grain topic configuration - e.g. a ZNode structure of: '/config/topics/topic_level1/topic_level2/...'. This would work best with an implementation of namespaces that was based on-top of topic-hierarchies, allowing configuration to be applied at the scope of: all tenants, one tenant, or one tenant's topics. So in summary: I think that namespaces can be usefully implemented independently of topic-hierarchies, and when topic-hierarchies are implemented would be easily integrated. Regards - Adrian -Gwen Shapira gshap...@cloudera.com wrote: - To: dev@kafka.apache.org dev@kafka.apache.org From: Gwen Shapira gshap...@cloudera.com Date: 04/28/2015 06:54PM Subject: Re: Adding multi-tenancy capabilities to Kafka I think recent discussion showed some need for topic namespaces - for example, Jun's use case for reserving topic names for specific users discussed under authorization. I think some discussion should happen on namespaces vs more full-fledged topic-hierarchy. I like the simplicity of namespaces, but there may be other requirements (such as inheriting configuration). Gwen On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com wrote: Hi all, I've been looking at how a Kafka cluster could be used be deployed so that it can be used by multiple tenants. Specifically: a scheme where the clients belonging to each tenant receive the impression they are operating against their own cluster. The ongoing security and quota work looks like it might provide a lot of the isolation requirements, but each tenant would currently share the same namespace for topics and consumer groups. So the illusion of it is my own cluster is broken as soon as two tenants try independent to use the same topic name. I wondered if other people on this list are interested in being able to support multiple tenants in this way? And / or if the ability to avoid clashes in the topic namespace would be useful? I am considering submitting a KIP in this area - but first wanted to get a feeling for whether these kinds of capabilities are of interest to others. Thanks in advance, - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU -- Regards, Ashish
Re: Review Request 33088: add heartbeat to coordinator
On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java, lines 73-76 https://reviews.apache.org/r/33088/diff/9/?file=941786#file941786line73 Maybe we can merge them into one error code, UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY? I have mixed feelings on this. The growing number of error codes bothers me, but it also seems like the distinct error codes will be helpful in consumer logs. This issue comes up again with roundrobin where a group is doing roundrobin and a new consumer joining the group subscribes to a different set of topics from the others in the group. It's not really INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY, at least not with the current naming and exception message. On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 60-66 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line60 Is removing the comments intentional? Yeah it was intentional. The comments were almost exactly repeating the same information as the variable types. On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 200 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line200 Some coding style suggestion: if { // comments ... } else { // comments ... } Will do. Somewhat of a tangent: It looks like we started using a checkstyle on kafka-clients. It would be nice if kafka came with eclipse and intellij style schemes for java and scala to reduce the style related comments and have the IDEs more proactively do the fixes for us. On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 374-382 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line374 It seems the transition from Rebalancing to Stable is trivial now. In this case shall we just remove the Rebalancing state, or it is actually meaningful to keep it? I prefer having each state representing one action for that group. If we reduce it down to something like Dead, Stable, and Rebalancing as I think you're suggesting, then the Rebalancing state would mean that we're either waiting for consumers to rejoin OR we're recomputing the partition assignments, but we don't know which. With Dead, Stable, PreparingRebalance, and Rebalancing, we reduce the amount of guessing for what the group is doing within a given state. On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 400-401 https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line400 I think we need to add a broker config for maximum heartbeat failure here; right now it is just 1. Paraphrased offline discussion I had with Guozhang: The only use case I can imagine for the maximum failures concept is where we already have consumers out in the wild across many different services and we've found that the default session timeout is too quick, so we end up getting a lot of group membership churn. It may be hard to get all the services to change their consumer configs, so we'd rolling bounce brokers with a higher maxAllowedExpiredHeartbeats. Pros of having the maximum failures concept: - It's a sort of hotfix means of giving the brokers some coarse-grained control over reducing group membership churn. Cons of having the maximum failures concept: - It's a sort of hotfix means of giving the brokers some coarse-grained control over reducing group membership churn. - Consumers don't know the broker's maxAllowedExpiredHeartbeats, so it seems odd from the consumer's perspective that they'd specify the session timeout for session expiration, but it may end up actually being some multiple of that value (and this entirely depends on which broker they happen to communicate with). - If we accept session timouts in the range [low, high], then really the consumer failures would happen in a time range of [low * maxAllowedExpiredHeartbeats, high * maxAllowedExpiredHeartbeats]. This gap can end up uncomfortably large. Guozhang and I agreed to remove this concept for now. If we see later on that we need to make the coordinator more conservative in marking consumers as dead, we can add similar logic in later. P.S.: If we want to eventually use this maximum failures concept, there's a bug in onConsumerHeartbeatExpired. If we have NOT exceeded the maxAllowedExpiredHeartbeats, then we should schedule another DelayedHeartbeat and not update consumer.latestHeartbeat. On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 451-453
Re: Adding multi-tenancy capabilities to Kafka
Jay, I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes up, but we can definitely start putting together the ideas. Would you suggest starting a kip or just a generic shared document that can be later translated into a KIP? On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, It would be nice to get a design around this. Though there are currently so many big things in flight I do wonder if we should start another parallel thing...? But working out a design can't hurt. Personally I think since one of the goals of Kafka is data integration we really want to support a central cluster with many users. So I think we really want to have full hierarchies with all users being part of the same tree of streams. This way granting permissions on a new stream just means changing permissions on what topic paths a client has access to rather than needing a new client instance that connects to that separate namespace (a la zookeeper). I think this will be a nice way to share config and acl defaults too. -Jay On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com javascript:; wrote: Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com javascript:; wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might result in the possibility for even finer grain topic configuration - e.g. a ZNode structure of: '/config/topics/topic_level1/topic_level2/...'. This would work best with an implementation of namespaces that was based on-top of topic-hierarchies, allowing configuration to be applied at the scope of: all tenants, one tenant, or one tenant's topics. So in summary: I think that namespaces can be usefully implemented independently of topic-hierarchies, and when topic-hierarchies are implemented would be easily integrated. Regards - Adrian -Gwen Shapira gshap...@cloudera.com javascript:; wrote: - To: dev@kafka.apache.org javascript:; dev@kafka.apache.org javascript:; From: Gwen Shapira gshap...@cloudera.com javascript:; Date: 04/28/2015 06:54PM Subject: Re: Adding multi-tenancy capabilities to Kafka I think recent discussion showed some need for topic namespaces - for example, Jun's use case for reserving topic names for specific users discussed under authorization. I think some discussion should happen on namespaces vs more full-fledged topic-hierarchy. I like the simplicity of namespaces, but there may be other requirements (such as inheriting configuration). Gwen On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com javascript:; wrote: Hi all, I've been looking at how a Kafka cluster could be used be deployed so that it can be used by multiple tenants. Specifically: a scheme where the clients belonging to each tenant receive the impression they are operating against their own cluster. The ongoing security and quota work looks like it might provide a lot of the isolation requirements, but each tenant would currently share the same namespace for topics and consumer groups. So the illusion of it is my own cluster is broken as soon as two tenants try independent to use the same topic name. I wondered if other people on
Re: Adding multi-tenancy capabilities to Kafka
I agree. If we can really do pluggable authorization (and even pluggable authentication), it would not actually be hard to effectively implement a multi-tenant solution. I am hoping to attempt to implement something like this once there is code / patches for KIP-11 and KIP-12. So I wonder if we would actually need a KIP for this versus requirements for an authorization module (and possibly an authentication module) and then document a best practice for setting up a multi-tenant deployment. I am a bit worried that KIPs 11/12 might not give us everything we need, but I think we can work through that as the implementation of both progresses On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote: Jay, I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes up, but we can definitely start putting together the ideas. Would you suggest starting a kip or just a generic shared document that can be later translated into a KIP? On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, It would be nice to get a design around this. Though there are currently so many big things in flight I do wonder if we should start another parallel thing...? But working out a design can't hurt. Personally I think since one of the goals of Kafka is data integration we really want to support a central cluster with many users. So I think we really want to have full hierarchies with all users being part of the same tree of streams. This way granting permissions on a new stream just means changing permissions on what topic paths a client has access to rather than needing a new client instance that connects to that separate namespace (a la zookeeper). I think this will be a nice way to share config and acl defaults too. -Jay On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com javascript:; wrote: Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com javascript:; wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might result in the possibility for even finer grain topic configuration - e.g. a ZNode structure of: '/config/topics/topic_level1/topic_level2/...'. This would work best with an implementation of namespaces that was based on-top of topic-hierarchies, allowing configuration to be applied at the scope of: all tenants, one tenant, or one tenant's topics. So in summary: I think that namespaces can be usefully implemented independently of topic-hierarchies, and when topic-hierarchies are implemented would be easily integrated. Regards - Adrian -Gwen Shapira gshap...@cloudera.com javascript:; wrote: - To: dev@kafka.apache.org javascript:; dev@kafka.apache.org javascript:; From: Gwen Shapira gshap...@cloudera.com javascript:; Date: 04/28/2015 06:54PM Subject: Re: Adding multi-tenancy capabilities to Kafka I think recent discussion showed some need for topic namespaces - for example, Jun's use case for reserving topic names for specific users discussed under authorization. I think some discussion should happen on namespaces vs more full-fledged topic-hierarchy. I like the simplicity of namespaces, but there may be other
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize) seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize) } Thanks, Honghai Chen -Original Message- From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID] Sent: Friday, April 24, 2015 12:57 AM To: dev@kafka.apache.org Cc: Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
[jira] [Created] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows
Honghai Chen created KAFKA-2170: --- Summary: 10 LogTest cases failed for file.renameTo failed under windows Key: KAFKA-2170 URL: https://issues.apache.org/jira/browse/KAFKA-2170 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.9.0 Environment: Windows Reporter: Honghai Chen Assignee: Jay Kreps get latest code from trunk, then run test gradlew -i core:test --tests kafka.log.LogTest Got 10 cases failed for same reason: kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 0 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.log.Log.deleteOldSegments(Log.scala:514) at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41) at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.ParentRunner.run(ParentRunner.java:220) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at $Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105) at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355) at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at
Re: Adding multi-tenancy capabilities to Kafka
I'm don't agree. At least not completely ;) Pluggable authorization (or hierarchies with the ability to define ACLs at each level) would allow you to isolate different groups of users. However each user will still be aware of one another's presence in the topic namespace - and could also be a noisy neighbor. For example: say I configure Kafka with an authorization plugin that only allows access to topics that start with the user's security principal. That partitions up the topic namespace so that one user can't clash with another - but has some limitations: 1) Applications need to be aware of the scheme. They can't just use 'topic1' - they need to know/find out their principal and prefix their topics appropriately. 2) There's no quotas (or similar scheme) that stop one user from impacting on another's available bandwidth. 3) Consumer groups would still be a shared namespace that users could clash on. How to move this forward? I agree with Gari that KIPs 11, 12 (and 13) are still in-flight - so it might be difficult to write another related KIP and keep it in step. The shared document approach that Ashish suggested sounds like a good fit. We can record our intent - and work out whether this needs to be written up as a KIP, or raised as improvement JIRAs, once the dust settles on 11, 12 and 13. - Adrian. On 05/05/15 20:33, Gari Singh wrote: I agree. If we can really do pluggable authorization (and even pluggable authentication), it would not actually be hard to effectively implement a multi-tenant solution. I am hoping to attempt to implement something like this once there is code / patches for KIP-11 and KIP-12. So I wonder if we would actually need a KIP for this versus requirements for an authorization module (and possibly an authentication module) and then document a best practice for setting up a multi-tenant deployment. I am a bit worried that KIPs 11/12 might not give us everything we need, but I think we can work through that as the implementation of both progresses On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote: Jay, I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes up, but we can definitely start putting together the ideas. Would you suggest starting a kip or just a generic shared document that can be later translated into a KIP? On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, It would be nice to get a design around this. Though there are currently so many big things in flight I do wonder if we should start another parallel thing...? But working out a design can't hurt. Personally I think since one of the goals of Kafka is data integration we really want to support a central cluster with many users. So I think we really want to have full hierarchies with all users being part of the same tree of streams. This way granting permissions on a new stream just means changing permissions on what topic paths a client has access to rather than needing a new client instance that connects to that separate namespace (a la zookeeper). I think this will be a nice way to share config and acl defaults too. -Jay On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com javascript:; wrote: Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com javascript:; wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might
Re: Adding multi-tenancy capabilities to Kafka
I'm don't agree. At least not completely ;) Pluggable authorization (or hierarchies with the ability to define ACLs at each level) would allow you to isolate different groups of users. However each user will still be aware of one another's presence in the topic namespace - and could also be a noisy neighbor. For example: say I configure Kafka with an authorization plugin that only allows access to topics that start with the user's security principal. That partitions up the topic namespace so that one user can't clash with another - but has some limitations: 1) Applications need to be aware of the scheme. They can't just use 'topic1' - they need to know/find out their principal and prefix their topics appropriately. 2) There's no quotas (or similar scheme) that stop one user from impacting on another's available bandwidth. 3) Consumer groups would still be a shared namespace that users could clash on. How to move this forward? I agree with Gari that KIPs 11, 12 (and 13) are still in-flight - so it might be difficult to write another related KIP and keep it in step. The shared document approach that Ashish suggested sounds like a good fit. We can record our intent - and work out whether this needs to be written up as a KIP, or raised as improvement JIRAs, once the dust settles on 11, 12 and 13. - Adrian. On 05/05/15 20:33, Gari Singh wrote: I agree. If we can really do pluggable authorization (and even pluggable authentication), it would not actually be hard to effectively implement a multi-tenant solution. I am hoping to attempt to implement something like this once there is code / patches for KIP-11 and KIP-12. So I wonder if we would actually need a KIP for this versus requirements for an authorization module (and possibly an authentication module) and then document a best practice for setting up a multi-tenant deployment. I am a bit worried that KIPs 11/12 might not give us everything we need, but I think we can work through that as the implementation of both progresses On Tue, May 5, 2015 at 3:23 PM, Ashish Singh asi...@cloudera.com wrote: Jay, I agree with you. This will kind of depend on how KIP-11 and KIP-12 shapes up, but we can definitely start putting together the ideas. Would you suggest starting a kip or just a generic shared document that can be later translated into a KIP? On Tuesday, May 5, 2015, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, It would be nice to get a design around this. Though there are currently so many big things in flight I do wonder if we should start another parallel thing...? But working out a design can't hurt. Personally I think since one of the goals of Kafka is data integration we really want to support a central cluster with many users. So I think we really want to have full hierarchies with all users being part of the same tree of streams. This way granting permissions on a new stream just means changing permissions on what topic paths a client has access to rather than needing a new client instance that connects to that separate namespace (a la zookeeper). I think this will be a nice way to share config and acl defaults too. -Jay On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com javascript:; wrote: Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com javascript:; wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might
[DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
I am trying to see if we can reuse the NetworkClient class to be used in controller to broker communication. (Also, we can probably use KafkaConsumer which is already using NetworkClient in replica fetchers). Currently NetworkClient does the following things in addition to sending requests. 1. Connection state management. 2. Flow control (inflight requests) 3. Metadata refresh In controller we need (1) and (2) but not (3). NetworkClient is tightly coupled with metadata now and this is the major blocker of reusing NetworkClient in controller. For controller, we don’t need NetworkClient to manage any metadata because the controller has listeners to monitor the cluster state and has all the information about topic metadata. I am thinking we can add a disable metadata refresh flag to NetworkClient or set metadata refresh interval to be Long.MAX_VALUE, so the metadata will be managed outside NetworkClient. This needs minimal change to allow NetworkClient to be reused, but the ugly part is NetworkClient still has the entire Metadata while it actually only needs a NodeList. Want to see what do people think about this. Thanks. Jiangjie (Becket) Qin
Re: [DISCUSS] KIP-21 Configuration Management
Joel, thanks for summarizing the requirements. It makes sense for the KIP to focus on Req #1, unless any future configs as dynamic ones warrants a completely different design. My main concern is going with a design by keeping only quotas in mind and then continue shoehorning other dynamic configs into that model even if it doesn't work that well. 1. In my earlier exchange with Jay, I mentioned the broker writing all it's configs to ZK (while respecting the overrides). Then ZK can be used to view all configs. My concern with supporting both is that it will be confusing for anyone to know where to look to find the final value of a config or be able to tell if a particular broker hasn't picked up a config value. Maybe you have thought about this, I am unclear about the fix you have in mind. 2. Need to think about this a bit more. Perhaps we can discuss this during the hangout tomorrow? This isn't relevant for LI but is important for a lot of users. So we should definitely state how those tools would continue to work with this change in the KIP. 3 4) I viewed these config changes as mainly administrative operations. In the case, it may be reasonable to assume that the ZK port is available for communication from the machine these commands are run. I'm not so sure about this assumption. Having a ConfigChangeRequest (or similar) is nice to have but having a new API and sending requests to controller also change how we do topic based configuration currently. I was hoping to keep this KIP as minimal as possible and provide a means to represent and modify client and broker based configs in a central place. Are there any concerns if we tackle these things in a later KIP? I don't have concerns about reducing the scope of this KIP as long as we are sure the approach we pick is the right direction for dynamic config and further changes are just incremental, not a redesign. On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes must be seen by all brokers eventually and we should be able to easily compare the full config of each broker. REQUIREMENT 3: Central config store. Needs to work with plain file-based configs and other systems (e.g., puppet). Ideally, should not bring in other dependencies (e.g., a DB). Possible options: - ZooKeeper - Kafka topic - other? E.g. making it pluggable? Any other requirements? Thanks, Joel On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote: Hey Neha, Thanks for the feedback. 1. In my earlier exchange with Jay, I mentioned the broker writing all it's configs to ZK (while respecting the overrides). Then ZK can be used to view all configs. 2. Need to think about this a bit more. Perhaps we can discuss this during the hangout tomorrow? 3 4) I viewed these config changes as mainly administrative operations. In the case, it may be reasonable to assume that the ZK port is available for communication from the machine these commands are run. Having a ConfigChangeRequest (or similar) is nice to have but having a new API and sending requests to controller also change how we do topic based configuration currently. I was hoping to keep this KIP as minimal as possible and provide a means to represent and modify client and broker based configs in a central place. Are there any concerns if we tackle these things in a later KIP? Thanks, Aditya From: Neha Narkhede [n...@confluent.io] Sent: Sunday, May 03, 2015 9:48 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-21 Configuration Management Thanks for starting this discussion, Aditya. Few questions/comments 1. If you change the default values like it's mentioned in the KIP, do you also overwrite the local config file as part of updating the default value? If not, where does the admin look to find the default values, ZK or local Kafka config file? What if a config value is different in both places? 2. I share Gwen's concern around making sure that popular config management tools continue to work
Re: Adding multi-tenancy capabilities to Kafka
Hey guys, It would be nice to get a design around this. Though there are currently so many big things in flight I do wonder if we should start another parallel thing...? But working out a design can't hurt. Personally I think since one of the goals of Kafka is data integration we really want to support a central cluster with many users. So I think we really want to have full hierarchies with all users being part of the same tree of streams. This way granting permissions on a new stream just means changing permissions on what topic paths a client has access to rather than needing a new client instance that connects to that separate namespace (a la zookeeper). I think this will be a nice way to share config and acl defaults too. -Jay On Tue, May 5, 2015 at 10:36 AM, Ashish Singh asi...@cloudera.com wrote: Adrian, Trying to follow up the discussion here. Is my understanding correct that if we have topic hierarchies then we can do without namespaces. To me namespace is an abstraction, it can be implemented with topic hierarchies as well, would you agree? If so I guess topic hierarchies is the way to go. One more thing I would like to put forward as an advantage of having topic hierarchy is that we can support acls/ permissions inheritance in topic hierarchies. This will avoid bootstrapping acls for each new topic. On Wed, Apr 29, 2015 at 7:42 AM, Adrian Preston prest...@uk.ibm.com wrote: Thanks for your response. I agree. I think it would be useful to get consensus on how namespaces and topic-hierarchies relate to one another. To seed the discussion - here's my viewpoint, which I hope others will challenge: I see namespaces as something the broker provides to ensure that two tenants can never interact with one another - even if, for example, they both choose to use a topic called 'TOPIC1'. I imagine that this would be achieved by having the broker silently add a per tenant prefix to the topic name in each request, and strip it off in the response. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1-TOPIC1'. If topic-hierarchies were available, then I think the prefix, added/removed to implement namespaces, would be the first level of qualification in the hierarchy. So, for example, if 'TENANT1' sends a message to 'TOPIC1', this would be re-written so that the send is to 'TENANT1/TOPIC1'. Extrapolating from what's currently in KIP-21 (dynamic configuration) I guess that topic-hierarchies might result in the possibility for even finer grain topic configuration - e.g. a ZNode structure of: '/config/topics/topic_level1/topic_level2/...'. This would work best with an implementation of namespaces that was based on-top of topic-hierarchies, allowing configuration to be applied at the scope of: all tenants, one tenant, or one tenant's topics. So in summary: I think that namespaces can be usefully implemented independently of topic-hierarchies, and when topic-hierarchies are implemented would be easily integrated. Regards - Adrian -Gwen Shapira gshap...@cloudera.com wrote: - To: dev@kafka.apache.org dev@kafka.apache.org From: Gwen Shapira gshap...@cloudera.com Date: 04/28/2015 06:54PM Subject: Re: Adding multi-tenancy capabilities to Kafka I think recent discussion showed some need for topic namespaces - for example, Jun's use case for reserving topic names for specific users discussed under authorization. I think some discussion should happen on namespaces vs more full-fledged topic-hierarchy. I like the simplicity of namespaces, but there may be other requirements (such as inheriting configuration). Gwen On Tue, Apr 28, 2015 at 10:42 AM, Adrian Preston prest...@uk.ibm.com wrote: Hi all, I've been looking at how a Kafka cluster could be used be deployed so that it can be used by multiple tenants. Specifically: a scheme where the clients belonging to each tenant receive the impression they are operating against their own cluster. The ongoing security and quota work looks like it might provide a lot of the isolation requirements, but each tenant would currently share the same namespace for topics and consumer groups. So the illusion of it is my own cluster is broken as soon as two tenants try independent to use the same topic name. I wondered if other people on this list are interested in being able to support multiple tenants in this way? And / or if the ability to avoid clashes in the topic namespace would be useful? I am considering submitting a KIP in this area - but first wanted to get a feeling for whether these kinds of capabilities are of interest to others. Thanks in advance, - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box
[jira] [Created] (KAFKA-2171) System Test for Quotas
Dong Lin created KAFKA-2171: --- Summary: System Test for Quotas Key: KAFKA-2171 URL: https://issues.apache.org/jira/browse/KAFKA-2171 Project: Kafka Issue Type: Sub-task Reporter: Dong Lin Assignee: Dong Lin Motivation and goal: We want to make sure that following features are working properly for both consumer and producer: default quota, client-specific quota, and quota sharing per clientId. The tests and configuration described aims to cover most of the scenarios. More test cases with varying configurations (e.g. ackNum) can be added if there is good reason to do so. Initial setup and configuration: In all scenarios, we first create kafka brokers and topic as follows: - create two kafka broker processes (by default local) - create a topic with replication factor = 2 and ackNum = -1 - let max_read = max_write = 5MB. The test machine is expected to provide read (write) throughput at least max_read (max_write). - we consider two rates are approximately the same if they differ by at most 5%. Scenario 1: Validate that max_read and max_write are provided by the test machine(s) using 1 producer and 1 consumer 1) produce data to the topic without rate limit for 30 seconds 2) record the rate of producer 3) then, consume data from the topic without rate limit until he finishes 4) record the rate of consumer 5) verify that the data consumed is identical to the data produced 6) verify that producer rate = max_write and consumer rate = max_read Scenario 2: validate the effectiveness of default write and read quota using 1 producer and 1 consumer 1) configure brokers to use max_write/2 as default write quota and max_read/2 as default read quota 2) produce data to the topic for 30 seconds 3) record the rate of producer 4) then, consume data from the topic until he finishes 5) record the rate of consumer 6) verify that the data consumed is identical to the data produced 7) verify that recorded write (read) rate is within 5% of max_write/2 (max_read/2). Scenario 3: validate the effectiveness of client-specific write and read quota using 2 producers and 2 consumers 1) configure brokers to use max_write/2 as default write quota and max_read/2 as default read quota; configure brokers to use max_write/4 for producer_2 and max_read/4 for consumer_2 2) both producers produce data to the topic for 30 seconds. They use different clientId. 3) record the rate of producer 4) both consumers consume data from the topic until they finish. They use different clientId and groupId. 5) record the rate of consumer 6) verify that the data consumed is identical to the data produced 7) verify that producer_1 and producer_2 rates are approximately max_write/2 and max_write/4; verify that consumer_1 and consumer_2 rates are approximately max_read/2 and max_read/4. Scenario 4: validate the effectiveness of write and read quota sharing among clients of same clientId using 2 producers and 2 consumers. 1) configure brokers to use max_write/2 as default write quota and max_read/2 as default read quota 2) both producers produce data to the topic for 30 seconds. They use same clientId. 3) record the rate of producer 4) both consumers consume data from the topic until they finish. They use same clientId but different groupId. 5) record the rate of consumer 6) verify that the data consumed is identical to the data produced 7) verify that total rate of producer_1 and producer_2 is approximately max_write/2; verify that total rate of consumer_1 and consumer_2 is approximately max_read/2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33088: add heartbeat to coordinator
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/ --- (Updated May 5, 2015, 5:50 p.m.) Review request for kafka. Bugs: KAFKA-1334 https://issues.apache.org/jira/browse/KAFKA-1334 Repository: kafka Description --- add heartbeat to coordinator todo: - see how it performs under real load - add error code handling on the consumer side Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 456b602245e111880e1b8b361319cabff38ee0e9 core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 2f5797064d4131ecfc9d2750d9345a9fa3972a9a core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala PRE-CREATION core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala df60cbc35d09937b4e9c737c67229889c69d8698 core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 8defa2e41c92f1ebe255177679d275c70dae5b3e core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION core/src/main/scala/kafka/coordinator/GroupRegistry.scala 94ef5829b3a616c90018af1db7627bfe42e259e5 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 821e26e97eaa97b5f4520474fff0fedbf406c82a core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION core/src/main/scala/kafka/server/DelayedOperationKey.scala b673e43b0ba401b2e22f27aef550e3ab0ef4323c core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala PRE-CREATION core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33088/diff/ Testing --- Thanks, Onur Karaman
[jira] [Commented] (KAFKA-1334) Add failure detection capability to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528896#comment-14528896 ] Onur Karaman commented on KAFKA-1334: - Updated reviewboard https://reviews.apache.org/r/33088/diff/ against branch origin/trunk Add failure detection capability to the coordinator / consumer -- Key: KAFKA-1334 URL: https://issues.apache.org/jira/browse/KAFKA-1334 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Onur Karaman Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch 1) Add coordinator discovery and failure detection to the consumer. 2) Add failure detection capability to the coordinator when group management is used. This will not include any rebalancing logic, just the logic to detect consumer failures using session.timeout.ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.
[ https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529033#comment-14529033 ] Ewen Cheslack-Postava commented on KAFKA-2103: -- Agreed with [~becket_qin] that the test shouldn't be assuming things out of it's control. I made some changes to this test in KAFKA-1501 and part of the point of the patch that ultimately resolved 1501 is that it removes as many cases of a getNextUnusedPort() as possible. I'd discourage trying to use it here (one does exist in integration.kafka.api.FixedPortTestUtils, but note that it's under the *integration* package for a reason). Some notes: * I left a note when refactoring that using fixed values here was ok because things were mocked. I didn't notice that if you work through a couple layers of that code, it ends up parsing the broker list for getting metadata -- not all of the code is properly mocked for the unit test. * Turns out that TestUtils.createBrokerConfig was broken -- it was not properly setting the port in the config. This seems to be the real issue for this particular test since I chose a broker port that is incredibly unlikely to be bound whereas before it was using the default 9092 which is probably commonly bound on developers' machines. I've submitted that patch as a temporary fix, but it doesn't really resolve the underlying issue that this unit test is actually trying to communicate with the server rather than using a mocked out implementation. * I tried an approach where we can allocate ports but don't close them, use those ports numbers, and then during teardown clean it up. I think this is the only way to safely use real ports allocated dynamically. This works, but has to wait for the connections that it's trying to start to timeout. We could use FixedPortTestUtils' existing method, but that can run into the same issues we were trying to resolve in KAFKA-1501. kafka.producer.AsyncProducerTest failure. - Key: KAFKA-2103 URL: https://issues.apache.org/jira/browse/KAFKA-2103 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Ewen Cheslack-Postava Attachments: KAFKA-2103.patch I saw this test consistently failing on trunk. The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809. kafka.producer.AsyncProducerTest testNoBroker FAILED org.scalatest.junit.JUnitTestFailedError: Should fail with FailedToSendMessageException at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300) kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic FAILED kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33857: Patch for KAFKA-2103
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33857/ --- Review request for kafka. Bugs: KAFKA-2103 https://issues.apache.org/jira/browse/KAFKA-2103 Repository: kafka Description --- KAFKA-2103: Fix TestUtils.createBrokerConfig to include the port. Diffs - core/src/test/scala/unit/kafka/utils/TestUtils.scala faae0e907596a16c47e8d49a82b6a3c82797c96d Diff: https://reviews.apache.org/r/33857/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.
[ https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2103: - Attachment: KAFKA-2103.patch kafka.producer.AsyncProducerTest failure. - Key: KAFKA-2103 URL: https://issues.apache.org/jira/browse/KAFKA-2103 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Dong Lin Attachments: KAFKA-2103.patch I saw this test consistently failing on trunk. The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809. kafka.producer.AsyncProducerTest testNoBroker FAILED org.scalatest.junit.JUnitTestFailedError: Should fail with FailedToSendMessageException at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300) kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic FAILED kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.
[ https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2103: - Assignee: Ewen Cheslack-Postava (was: Dong Lin) Status: Patch Available (was: Open) kafka.producer.AsyncProducerTest failure. - Key: KAFKA-2103 URL: https://issues.apache.org/jira/browse/KAFKA-2103 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Ewen Cheslack-Postava Attachments: KAFKA-2103.patch I saw this test consistently failing on trunk. The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809. kafka.producer.AsyncProducerTest testNoBroker FAILED org.scalatest.junit.JUnitTestFailedError: Should fail with FailedToSendMessageException at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300) kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic FAILED kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2103) kafka.producer.AsyncProducerTest failure.
[ https://issues.apache.org/jira/browse/KAFKA-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529032#comment-14529032 ] Ewen Cheslack-Postava commented on KAFKA-2103: -- Created reviewboard https://reviews.apache.org/r/33857/diff/ against branch origin/trunk kafka.producer.AsyncProducerTest failure. - Key: KAFKA-2103 URL: https://issues.apache.org/jira/browse/KAFKA-2103 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Dong Lin Attachments: KAFKA-2103.patch I saw this test consistently failing on trunk. The recent changes are KAFKA-2099, KAFKA-1926, KAFKA-1809. kafka.producer.AsyncProducerTest testNoBroker FAILED org.scalatest.junit.JUnitTestFailedError: Should fail with FailedToSendMessageException at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.producer.AsyncProducerTest.testNoBroker(AsyncProducerTest.scala:300) kafka.producer.AsyncProducerTest testIncompatibleEncoder PASSED kafka.producer.AsyncProducerTest testRandomPartitioner PASSED kafka.producer.AsyncProducerTest testFailedSendRetryLogic FAILED kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:415) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529043#comment-14529043 ] Sriharsha Chintalapani commented on KAFKA-1690: --- [~junrao] [~jjkoshy] I got feedback from Joel. I'll update the patch add more tests in next 2 days. Thanks. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2156) Possibility to plug in custom MetricRegistry
[ https://issues.apache.org/jira/browse/KAFKA-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528203#comment-14528203 ] Andras Sereny commented on KAFKA-2156: -- Thanks Neha. This might already work with Kafka Metrics, as far as I can see: get the appropriate Sensors from the Metrics (this has to be accessible somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant (have to get all the necessary Sensors), but workable. Is there a timeline for converting to Kafka Metrics? Thanks, András Possibility to plug in custom MetricRegistry Key: KAFKA-2156 URL: https://issues.apache.org/jira/browse/KAFKA-2156 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.1.2 Reporter: Andras Sereny Assignee: Jun Rao The trait KafkaMetricsGroup refers to Metrics.defaultRegistry() throughout. It would be nice to be able to inject any MetricsRegistry instead of the default one. (My usecase is that I'd like to channel Kafka metrics into our application's metrics system, for which I'd need custom implementations of com.yammer.metrics.core.Metric.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows
[ https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529337#comment-14529337 ] Jun Rao commented on KAFKA-2170: We do have the logic in OffsetIndex.resize() to unmap before setting the file length, if the platform is windows. Could you check if that logic is actually triggered? 10 LogTest cases failed for file.renameTo failed under windows --- Key: KAFKA-2170 URL: https://issues.apache.org/jira/browse/KAFKA-2170 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.9.0 Environment: Windows Reporter: Honghai Chen Assignee: Jay Kreps get latest code from trunk, then run test gradlew -i core:test --tests kafka.log.LogTest Got 10 cases failed for same reason: kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 0 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.log.Log.deleteOldSegments(Log.scala:514) at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41) at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.ParentRunner.run(ParentRunner.java:220) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at $Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105) at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize) seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize) }
[jira] [Comment Edited] (KAFKA-2156) Possibility to plug in custom MetricRegistry
[ https://issues.apache.org/jira/browse/KAFKA-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528203#comment-14528203 ] Andras Sereny edited comment on KAFKA-2156 at 5/5/15 10:58 AM: --- Thanks Neha. This might already work with Kafka Metrics, as far as I can see: get the appropriate Sensors from the Metrics (this has to be accessible somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant (have to get all the necessary Sensors), but workable. Is there a timeline for migarting to Kafka Metrics? Thanks, András was (Author: sandris): Thanks Neha. This might already work with Kafka Metrics, as far as I can see: get the appropriate Sensors from the Metrics (this has to be accessible somehow) and then .add a custom MeasurableStat to the Sensor. Not too elegant (have to get all the necessary Sensors), but workable. Is there a timeline for converting to Kafka Metrics? Thanks, András Possibility to plug in custom MetricRegistry Key: KAFKA-2156 URL: https://issues.apache.org/jira/browse/KAFKA-2156 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.1.2 Reporter: Andras Sereny Assignee: Jun Rao The trait KafkaMetricsGroup refers to Metrics.defaultRegistry() throughout. It would be nice to be able to inject any MetricsRegistry instead of the default one. (My usecase is that I'd like to channel Kafka metrics into our application's metrics system, for which I'd need custom implementations of com.yammer.metrics.core.Metric.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)