Re: Review Request 32650: Patch for KAFKA-2000
On April 20, 2015, 11:18 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 124 https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124 I think there is an issue in relying on the metadata cache mainly due to start-up. E.g., when we start up the broker (and offset manager) the metadata cache will actually be empty so this would delete _all_ the offsets. Unfortunately even after start-up there is no _guarantee_ that you have the most current information in the cache (say, if the controller failed to send an UpdateMetadataRequest to the broker by the time the compactor task runs) Actually - I think what you have is correct. The offset cache would be empty at start-up and would only be populated on becoming leader. However, we just need to make sure that we get the complete cluster topic metadata before the compactor thread runs. I'll take another look tomorrow. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/#review80857 --- On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/ --- (Updated March 30, 2015, 9:47 p.m.) Review request for kafka. Bugs: KAFKA-2000 https://issues.apache.org/jira/browse/KAFKA-2000 Repository: kafka Description --- KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. Diffs - core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 Diff: https://reviews.apache.org/r/32650/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
1. Yes, this will be much easier. Okay, let's add it. 2, Okay. This will differ a little bit from the way currently kafka-topics.sh handles alter-topic command, but I think it's a reasonable restriction. I'll update KIP acordingly to our weekly call. Thanks, Andrii Biletskyi On Mon, Apr 20, 2015 at 10:56 PM, Jun Rao j...@confluent.io wrote: 1. Yes, lag is probably only going to be useful for the admin client. However, so is isr. It seems to me that we should get lag and isr from the same request. I was thinking that we can just extend TMR by changing replicas from an array of int to an array of (int, lag) pairs. Is that too complicated? 3. I was thinking that we just don't allow the cli to change more than one thing at a time. So, you will get an error if you want to change both partitions and configs. Thanks, Jun On Sun, Apr 19, 2015 at 8:22 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 1. Yes, seems we can add lag info to the TMR. But before that I wonder whether there are other reasons we need this info except for reassign partition command? As we discussed earlier the problem with poor monitoring capabilities for reassign-partitions (as currently we only inform users Completed/In Progress per partition) may require separate solution. We were thinking about separate Wire protocol request. And I actually like your idea about adding some sort of BrokerMetadataRequest for these purposes. I actually think we can cover some other items (like rack-awareness) but for me it deserves a separate KIP really. Also, adding Replica-Lag map per partition will make TopicMetadataResponse very sophisticated: Map[TopicName, Map[PartitionId, Map[ReplicaId, Lag]]. Maybe we need to leave it for a moment and propose new request rather than making a new step towards one monster request. 2. Yes, error per topic. The only question is whether we should execute at least the very first alter topic command from the duplicated topic set or return error for all ... I think the more predictable and reasonable option for clients would be returning errors for all duplicated topics. 3. Hm, yes. Actually we also have change topic config there. But it is not related to such replication commands as increase replicas or change replica assignment. This will make CLI implementation a bit strange: if user specifies increase partitions and change topic config in one line - taking into account 2. we will have to create two separate alter topic requests, which were designed as batch requests :), but probably we can live with it. Okay, I will think about a separate error code to cover such cases. 4. We will need InvalidArgumentTopic (e.g. contains prohibited chars), IAPartitions, IAReplicas, IAReplicaAssignment, IATopicConfiguration. A server side implementation will be a little bit messy (like dozens if this then this error code) but maybe we should think about clients at the first place here. Thanks, Andrii Biletskyi On Fri, Apr 17, 2015 at 1:46 AM, Jun Rao j...@confluent.io wrote: 1. For the lags, we can add a new field lags per partition. It will return for each replica that's not in isr, the replica id and the lag in messages. Also, if TMR is sent to a non-leader, the response can just include an empty array for isr and lags. 2. So, we will just return a topic level error for the duplicated topics, right? 3. Yes, it's true that today, one can specify both partitions and replicaAssignment in the TopicCommand. However, partitions is actually ignored. So, it will be clearer if we don't allow users to do this. 4. How many specific error codes like InvalidPartitions and InvalidReplicas are needed? If it's not that many, giving out more specific error will be useful for non-java clients. Thanks, Jun On Wed, Apr 15, 2015 at 10:23 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for the discussion! Summary: 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can affect implementation? A: We can fix this issue for the leading broker - ReplicaManager (or Partition) component should have accurate isr list, then with leading broker having correct info, to do a describe-topic we will need to define leading brokers for partitions and ask those for a correct isr list. Also, we should consider adding lag information to TMR for each follower for partition reassignment, as Jun suggested above. 2. Q: What if user adds different alter commands for the same topic in scope of one batch request? A: Because of the async nature of AlterTopicRequest it will be very hard then to assemble the expected (in terms of checking
Re: Review Request 33383: Patch for KAFKA-1595
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33383/ --- (Updated April 21, 2015, 7:03 a.m.) Review request for kafka. Bugs: KAFKA-1595 https://issues.apache.org/jira/browse/KAFKA-1595 Repository: kafka Description --- Use the same `scalatest` version for all Scala versions and remove unused code. Introduce `testJsonParse` Simple test that shows existing behaviour. KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A combination of spray-json's AST combined with jawn's parser are used as the replacement. Note that both libraries have no dependencies and are relatively simple. We use `jawn` for its performance, but it could be dropped by changing one line in `Json.parseFull`. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `DeserializationException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. Minor clean-ups in `Json.encode` Diffs - README.md 946ec62cc71df93c905c5f35caf5cdb9c78e5c10 build.gradle 4775ee46c480eab7b8250e61ba1705d00f72a6aa core/src/main/scala/kafka/admin/AdminUtils.scala eee80f9c2c12da8e4879e96785f3b75a8ff7d1cd core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 3b3cd67d890e05c00d2a36a577f940347a0d387a core/src/main/scala/kafka/cluster/Broker.scala 79e16c167f67cfdef8a90212bc1c7607f989d102 core/src/main/scala/kafka/consumer/TopicCount.scala 6994c8e89055b0bb300da6346c058c8fbbea2c29 core/src/main/scala/kafka/controller/KafkaController.scala 3a09377611b48198c4c3cd1a118fc12eda0543d4 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d2bac85e16a247b1326f63619711fb0bbbd2e82a core/src/main/scala/kafka/utils/Json.scala d1102844748f2e88f79932281fe95583a57d2d16 core/src/main/scala/kafka/utils/ReplicationUtils.scala 60687332b4c9bee4d4c0851314cfb4b02d5d3489 core/src/main/scala/kafka/utils/ZkUtils.scala 5685a1eddb218baee617161f269cd1aee67bab9f core/src/test/scala/unit/kafka/utils/JsonTest.scala 93550e8f24071f88eb1ea5b41373efee27e4b8b7 Diff: https://reviews.apache.org/r/33383/diff/ Testing (updated) --- `testAll` succeeded eventually (it seems like some tests that rely on timings can sometimes fail). Thanks, Ismael Juma
Re: [KIP-DISCUSSION] KIP-13 Quotas
Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine large request, large resopnse: we really cannot do anything here but we don't really have this scenario yet So the design would look like this: - parse request - before handling request check if quota violated; if so compute two delay numbers: - before handling delay - before response delay - if before-handling delay 0 insert into before-handling delay queue - handle the request - if before-response delay 0 insert into before-response delay queue - respond Just throwing this out there for discussion. Thanks, Joel On Thu, Apr 16, 2015 at 02:56:23PM -0700, Jun Rao wrote: The quota check for the fetch request is a bit different from the produce request. I assume that for the fetch request, we will first get an estimated fetch response size to do the quota check. There are two things to think about. First, when we actually send the response, we probably don't want to record the metric again since it will double count. Second, the bytes that the fetch response actually sends could be more than the estimate. This means that the metric may not be 100% accurate. We may be able to limit the fetch size of each partition to what's in the original estimate. For the produce request, I was thinking that another way to do this is to first figure out the quota_timeout. Then wait in Purgatory for quota_timeout with no key. If the request is not satisfied in quota_timeout and (request_timeout quota_timeout), wait in Purgatory for (request_timeout - quota_timeout) with the original keys. Thanks, Jun On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated April 21, 2015, 7:33 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- WIP: First patch for quotas. Changes are 1. Adding per-client throttle time and quota metrics in ClientQuotaMetrics.scala 2. Making changes in QuotaViolationException and Sensor to return delay time changes. 3. Added configuration needed so far for quotas in KafkaConfig. 4. Unit tests This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. Hopefully, this smaller patch is easier to review. Please read: This patch has 2 approaches for managing quotas in ClientQuotaMetrics and CLientQuotaMetrics2 along with some example usage in ReplicaManager. This code will have to be cleaned up significantly in order to commit but I'm looking for feedback on which approach to use. Approach 1: ClientQuotaMetrics wraps everything into a single class. Adding new metrics is much clumsier. Approach 2: ClientQuotaMetrics2 only maintains per-client metrics for a single entity (producer, consumer) etc.. This makes the code easier to use. For throttling on a new dimention i.e. request per second, we only need to create this object with a new quota and will just work. Diffs - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 69b772c1941865fbe15b34bb2784c511f8ce519a core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505563#comment-14505563 ] Aditya A Auradkar commented on KAFKA-2084: -- Updated reviewboard https://reviews.apache.org/r/33049/diff/ against branch trunk byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated April 21, 2015, 7:21 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Diffs (updated) - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 69b772c1941865fbe15b34bb2784c511f8ce519a core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2132) Move Log4J appender to clients module
[ https://issues.apache.org/jira/browse/KAFKA-2132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505370#comment-14505370 ] Joe Stein commented on KAFKA-2132: -- Can we put it in the new new admin client tools jar that KAFKA-1694 is creating? tools/src/main/java/org/apache/kafka/loggers/KafkaLog4JAppenderBasic.java or something... That is all Java code and think the Log4j being in Java code would be preferable. Move Log4J appender to clients module - Key: KAFKA-2132 URL: https://issues.apache.org/jira/browse/KAFKA-2132 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Ashish K Singh Log4j appender is just a producer. Since we have a new producer in the clients module, no need to keep Log4J appender in core and force people to package all of Kafka with their apps. Lets move the Log4jAppender to clients module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2132) Move Log4J appender to clients module
[ https://issues.apache.org/jira/browse/KAFKA-2132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505376#comment-14505376 ] Jay Kreps commented on KAFKA-2132: -- Shouldn't the admin stuff be part of the main client jar? Kind of makes sense to have producer, consumer, admin, plus any future clients provided they are all clean dependency-wise. And wouldn't including this then drag in log4j as a dependency for the admin tools? Move Log4J appender to clients module - Key: KAFKA-2132 URL: https://issues.apache.org/jira/browse/KAFKA-2132 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Ashish K Singh Log4j appender is just a producer. Since we have a new producer in the clients module, no need to keep Log4J appender in core and force people to package all of Kafka with their apps. Lets move the Log4jAppender to clients module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
On April 17, 2015, 11:21 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/ClientQuotaMetrics.scala, line 33 https://reviews.apache.org/r/33049/diff/3/?file=924194#file924194line33 Is this necessary? Not strictly but I felt it was a nice to have. I can make the default 1 if it helps. I'm envisioning a scenario where we discover in production that our throttle time computation isn't agressive enough and this can help in such a case. On April 17, 2015, 11:21 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/ClientQuotaMetrics.scala, line 81 https://reviews.apache.org/r/33049/diff/3/?file=924194#file924194line81 Overall, I think this should work, but it seems slightly high touch no? i.e., do we really need to wrap everything? i.e., you definitely need a clientQuotaMetricsConfig, but it seems many of the wrapper routines here can be folded into the core metrics package. E.g., otherwise for every metric that we want to quota on, we are forced to add new record* methods; If I'm reading this right, a motivation for having the wrappers is to getOrCreate the sensors. Can we just pre-emptively (at the beginning) create the per-client sensors and then avoid the wrapper routines? This would also help avoid the need for the extra quota map and the synchronization logic in creating the sensors. It isn't possible to pro-actively create all the sensors since we don't know in advance what clients will connect to the system. Eventually, when we have dynamic configs we can't even proactively create sensors for the overridden ones. I've published a new patch that will make this easier to use i.e. not have typed methods per metric we want to throttle. - Aditya --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review80130 --- On April 21, 2015, 7:21 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated April 21, 2015, 7:21 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Diffs - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 69b772c1941865fbe15b34bb2784c511f8ce519a core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2132) Move Log4J appender to clients module
[ https://issues.apache.org/jira/browse/KAFKA-2132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505329#comment-14505329 ] Jay Kreps commented on KAFKA-2132: -- Yeah it definitely makes sense to move the log4j appender out of core. I don't think we should add it to clients, though, as clients currently depends on slf4j (the whole point of which is to avoid a direct log4j dependency). Arguably the whole java logging thing is silly since everyone ends up depending on log4j in the end anyway, but I guess for appearances sake we should try not to be the library that causes that? :-) So I think maybe just making a separate log4j module is the best approach? Kind of silly to have a whole jar with just one class but I guess I don't see another option. Move Log4J appender to clients module - Key: KAFKA-2132 URL: https://issues.apache.org/jira/browse/KAFKA-2132 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Assignee: Ashish K Singh Log4j appender is just a producer. Since we have a new producer in the clients module, no need to keep Log4J appender in core and force people to package all of Kafka with their apps. Lets move the Log4jAppender to clients module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2084: - Attachment: KAFKA-2084_2015-04-21_12:21:18.patch byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2084: - Attachment: KAFKA-2084_2015-04-21_12:28:05.patch byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated April 21, 2015, 7:28 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Sample usage in ReplicaManager Diffs (updated) - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 69b772c1941865fbe15b34bb2784c511f8ce519a core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505574#comment-14505574 ] Aditya A Auradkar commented on KAFKA-2084: -- Updated reviewboard https://reviews.apache.org/r/33049/diff/ against branch trunk byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2036) Consumer and broker have different networks
[ https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arsenii Krasikov updated KAFKA-2036: Attachment: patch2 Consumer and broker have different networks --- Key: KAFKA-2036 URL: https://issues.apache.org/jira/browse/KAFKA-2036 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8.2.1 Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker Reporter: Arsenii Krasikov Assignee: Jun Rao Attachments: patch, patch2 If broker is connected to several networks ( for example ipv6 and ipv4 ) and not all of them are reachable to consumer then {{kafka.network.BlockingChannel}} gives up to connect after the first Network is unreachable error not triyng remaining networks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2036) Consumer and broker have different networks
[ https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504906#comment-14504906 ] Arsenii Krasikov commented on KAFKA-2036: - {{patch}} is tested in production, {{patch2}} is tested only with kafka tests, they completes each other, not replaces. Consumer and broker have different networks --- Key: KAFKA-2036 URL: https://issues.apache.org/jira/browse/KAFKA-2036 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8.2.1 Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker Reporter: Arsenii Krasikov Assignee: Jun Rao Attachments: patch, patch2 If broker is connected to several networks ( for example ipv6 and ipv4 ) and not all of them are reachable to consumer then {{kafka.network.BlockingChannel}} gives up to connect after the first Network is unreachable error not triyng remaining networks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2036) Consumer and broker have different networks
[ https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arsenii Krasikov updated KAFKA-2036: Attachment: (was: patch2) Consumer and broker have different networks --- Key: KAFKA-2036 URL: https://issues.apache.org/jira/browse/KAFKA-2036 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8.2.1 Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker Reporter: Arsenii Krasikov Assignee: Jun Rao Attachments: patch, patch2 If broker is connected to several networks ( for example ipv6 and ipv4 ) and not all of them are reachable to consumer then {{kafka.network.BlockingChannel}} gives up to connect after the first Network is unreachable error not triyng remaining networks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2024) Cleaner can generate unindexable log segments
[ https://issues.apache.org/jira/browse/KAFKA-2024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504654#comment-14504654 ] Rajini Sivaram commented on KAFKA-2024: --- [~mgharat] 32-bit relative offsets are stored as explained in the javadoc for OffsetIndex.scala: {quote} The file format is a series of entries. The physical format is a 4 byte relative offset and a 4 byte file location for the message with that offset. The offset stored is relative to the base offset of the index file. So, for example, if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use only 4 bytes for the offset. {quote} Cleaner can generate unindexable log segments - Key: KAFKA-2024 URL: https://issues.apache.org/jira/browse/KAFKA-2024 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2024.patch It's possible for log cleaning to generate segments that have a gap of more than Int.MaxValue between their base offset and their last offset. It's not possible to index those segments since there's only 4 bytes available to store that difference. The broker will end up writing overflowed ints into the index, and doesn't detect that there is a problem until restarted, at which point you get one of these: 2015-03-16 20:35:49,632 FATAL [main] kafka.server.KafkaServerStartable - Fatal error during KafkaServerStartable startup. Prepare to shutdown java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/mnt/persistent/kafka-logs/topic/.index) has non-zero size but the last offset is -1634293959 and the base offset is 0 at scala.Predef$.require(Predef.scala:233) at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:204) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:203) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.loadSegments(Log.scala:203) at kafka.log.Log.init(Log.scala:67) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Hi all, I've updated KIP-4 page to include all previously discussed items such as: new error codes, merged alter-topic and reassign-partitions requests, added TMR_V1. It'd be great if we concentrate on the Errors+Wire Protocol schema and discuss any remaining issues today, since first patch will include only server-side implementation. Thanks, Andrii Biletskyi On Tue, Apr 21, 2015 at 9:46 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: 1. Yes, this will be much easier. Okay, let's add it. 2, Okay. This will differ a little bit from the way currently kafka-topics.sh handles alter-topic command, but I think it's a reasonable restriction. I'll update KIP acordingly to our weekly call. Thanks, Andrii Biletskyi On Mon, Apr 20, 2015 at 10:56 PM, Jun Rao j...@confluent.io wrote: 1. Yes, lag is probably only going to be useful for the admin client. However, so is isr. It seems to me that we should get lag and isr from the same request. I was thinking that we can just extend TMR by changing replicas from an array of int to an array of (int, lag) pairs. Is that too complicated? 3. I was thinking that we just don't allow the cli to change more than one thing at a time. So, you will get an error if you want to change both partitions and configs. Thanks, Jun On Sun, Apr 19, 2015 at 8:22 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 1. Yes, seems we can add lag info to the TMR. But before that I wonder whether there are other reasons we need this info except for reassign partition command? As we discussed earlier the problem with poor monitoring capabilities for reassign-partitions (as currently we only inform users Completed/In Progress per partition) may require separate solution. We were thinking about separate Wire protocol request. And I actually like your idea about adding some sort of BrokerMetadataRequest for these purposes. I actually think we can cover some other items (like rack-awareness) but for me it deserves a separate KIP really. Also, adding Replica-Lag map per partition will make TopicMetadataResponse very sophisticated: Map[TopicName, Map[PartitionId, Map[ReplicaId, Lag]]. Maybe we need to leave it for a moment and propose new request rather than making a new step towards one monster request. 2. Yes, error per topic. The only question is whether we should execute at least the very first alter topic command from the duplicated topic set or return error for all ... I think the more predictable and reasonable option for clients would be returning errors for all duplicated topics. 3. Hm, yes. Actually we also have change topic config there. But it is not related to such replication commands as increase replicas or change replica assignment. This will make CLI implementation a bit strange: if user specifies increase partitions and change topic config in one line - taking into account 2. we will have to create two separate alter topic requests, which were designed as batch requests :), but probably we can live with it. Okay, I will think about a separate error code to cover such cases. 4. We will need InvalidArgumentTopic (e.g. contains prohibited chars), IAPartitions, IAReplicas, IAReplicaAssignment, IATopicConfiguration. A server side implementation will be a little bit messy (like dozens if this then this error code) but maybe we should think about clients at the first place here. Thanks, Andrii Biletskyi On Fri, Apr 17, 2015 at 1:46 AM, Jun Rao j...@confluent.io wrote: 1. For the lags, we can add a new field lags per partition. It will return for each replica that's not in isr, the replica id and the lag in messages. Also, if TMR is sent to a non-leader, the response can just include an empty array for isr and lags. 2. So, we will just return a topic level error for the duplicated topics, right? 3. Yes, it's true that today, one can specify both partitions and replicaAssignment in the TopicCommand. However, partitions is actually ignored. So, it will be clearer if we don't allow users to do this. 4. How many specific error codes like InvalidPartitions and InvalidReplicas are needed? If it's not that many, giving out more specific error will be useful for non-java clients. Thanks, Jun On Wed, Apr 15, 2015 at 10:23 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for the discussion! Summary: 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can affect implementation? A: We can fix this issue for the leading broker - ReplicaManager (or Partition) component should have accurate isr list, then with leading broker having correct info, to do a describe-topic we will need to define leading brokers for partitions and
[jira] [Commented] (KAFKA-2036) Consumer and broker have different networks
[ https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504876#comment-14504876 ] Arsenii Krasikov commented on KAFKA-2036: - I find that {{InetSocketAddress}} is only passed into clients/src/main/java/org/apache/kafka/common/network. {{org.apache.kafka.clients.NetworkClient.initiateConnect}}:499 constructs incorrectly {{InetSocketAddress}} and passes it to {{Selectable}}. {{org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses}} has the same problem as {{BlockingChannel}}: we need to create {{InetSocketAddress}} and test it at once, we can't return it. So, it seems like the {{parseAndValidateAddresses}} logic is incorrect and we need to return sockets or use that resolvers/validators at place. There is some sort of patch for {{org.apache.kafka.clients.NetworkClient.initiateConnect}}: {{this.connectionStates.connecting(node.id(), now)}} and {{selector.connect(...)}} are swapped around because {{selector.connect()}} can throw an error. Consumer and broker have different networks --- Key: KAFKA-2036 URL: https://issues.apache.org/jira/browse/KAFKA-2036 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8.2.1 Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker Reporter: Arsenii Krasikov Assignee: Jun Rao Attachments: patch If broker is connected to several networks ( for example ipv6 and ipv4 ) and not all of them are reachable to consumer then {{kafka.network.BlockingChannel}} gives up to connect after the first Network is unreachable error not triyng remaining networks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2036) Consumer and broker have different networks
[ https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arsenii Krasikov updated KAFKA-2036: Attachment: patch2 Consumer and broker have different networks --- Key: KAFKA-2036 URL: https://issues.apache.org/jira/browse/KAFKA-2036 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8.2.1 Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker Reporter: Arsenii Krasikov Assignee: Jun Rao Attachments: patch, patch2 If broker is connected to several networks ( for example ipv6 and ipv4 ) and not all of them are reachable to consumer then {{kafka.network.BlockingChannel}} gives up to connect after the first Network is unreachable error not triyng remaining networks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2137) New Kafka Producer not fully asynchronous
David Hay created KAFKA-2137: Summary: New Kafka Producer not fully asynchronous Key: KAFKA-2137 URL: https://issues.apache.org/jira/browse/KAFKA-2137 Project: Kafka Issue Type: Improvement Reporter: David Hay The new Producer client attempts to be fully asynchronous. However, it sill has the potential to block at the start of the {{send}} method when it asks for the metadata for the topic. ({{waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs)}}) There is a timeout (60 seconds, by default), but it would be nice if this lookup was performed in the background thread as well. This way producers could fire and forget without any potential to block the sending thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [KIP-DISCUSSION] KIP-13 Quotas
In either approach I'm not sure we considered being able to turn it off completely. IOW, no it is not a plugin if that's what you are asking. We can set very high defaults by default and in the absence of any overrides it would effectively be off. The quota enforcement is actually already part of the metrics package. The new code (that exercises it) will be added to wherever the metrics are being measured. Thanks, Joel On Tue, Apr 21, 2015 at 03:04:07PM -0400, Tong Li wrote: Joel, Nice write up. Couple of questions, not sure if they have been answered. Since we will have a call later today, I would like to ask here as well so that we can talk about if not responded in email discussion. 1. Where the new code will be plugged in, that is, where is the plugin point, KafkaApi? 2. Can this quota control be disabled/enabled without affect anything else? From the design wiki page, it seems to me that each request will at least pay a penalty of checking quota enablement. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Joel Koshy jjkosh...@gmail.com To: dev@kafka.apache.org Date: 04/21/2015 01:22 PM Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine large request, large resopnse: we really cannot do anything here but we don't really have this scenario yet So the design would look like this: - parse request - before handling request check if quota violated; if so compute two delay numbers: - before handling delay - before response delay - if before-handling delay 0 insert into before-handling delay queue - handle the request - if before-response delay 0 insert into before-response delay queue - respond Just throwing this out there for discussion. Thanks, Joel
Review Request 33410: Patch for KAFKA-2034
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33410/ --- Review request for kafka. Bugs: KAFKA-2034 https://issues.apache.org/jira/browse/KAFKA-2034 Repository: kafka Description --- KAFKA-2034; Set sourceCompatibility in build.gradle Diffs - build.gradle 4775ee46c480eab7b8250e61ba1705d00f72a6aa Diff: https://reviews.apache.org/r/33410/diff/ Testing --- Thanks, Ismael Juma
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Also, I think I may have missed this but does READ imply you also have DESCRIBE? A reader will need access to both read offsets (to determine their own initial position) as well as commit offsets. Currently, though fetching offsets is under DESCRIBE only and commit offsets is under READ. If READ=DESCRIBE are there any other implied permissions like that? -Jay On Tue, Apr 21, 2015 at 1:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Parth, Great write-up! One super minor thing: could we change the EDIT permission to be called ALTER? The request name in KIP-4 is Alter and the command line tool has always been alter (or we could go the other way and change those to EDIT). Not sure that one is any better than the other but consistency is always nice. -Jay On Tue, Apr 21, 2015 at 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
[jira] [Created] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
Jiangjie Qin created KAFKA-2138: --- Summary: KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Changed Edit to Alter. I did not think about it that way but Sriharsha raised the same point in a private conversation. I did not think about it that way but I agree it makes sense. If no one objects I think in default implementation we can infer that if user have READ or WRITE access he gets DESCRIBE for free. Thanks Parth On 4/21/15, 2:04 PM, Jay Kreps jay.kr...@gmail.com wrote: Also, I think I may have missed this but does READ imply you also have DESCRIBE? A reader will need access to both read offsets (to determine their own initial position) as well as commit offsets. Currently, though fetching offsets is under DESCRIBE only and commit offsets is under READ. If READ=DESCRIBE are there any other implied permissions like that? -Jay On Tue, Apr 21, 2015 at 1:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Parth, Great write-up! One super minor thing: could we change the EDIT permission to be called ALTER? The request name in KIP-4 is Alter and the command line tool has always been alter (or we could go the other way and change those to EDIT). Not sure that one is any better than the other but consistency is always nice. -Jay On Tue, Apr 21, 2015 at 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: Review Request 33410: Patch for KAFKA-2034
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33410/ --- (Updated April 21, 2015, 9 p.m.) Review request for kafka. Bugs: KAFKA-2034 https://issues.apache.org/jira/browse/KAFKA-2034 Repository: kafka Description --- KAFKA-2034; Set sourceCompatibility in build.gradle Diffs - build.gradle 4775ee46c480eab7b8250e61ba1705d00f72a6aa Diff: https://reviews.apache.org/r/33410/diff/ Testing (updated) --- Compiled with JDK 8 and verified that the target level in the bytecode was `50` (ie Java 6) instead of `52`: find . -name '*.class' | xargs javap -verbose | grep major version: | sort | uniq major version: 50 Thanks, Ismael Juma
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hey Parth, Great write-up! One super minor thing: could we change the EDIT permission to be called ALTER? The request name in KIP-4 is Alter and the command line tool has always been alter (or we could go the other way and change those to EDIT). Not sure that one is any better than the other but consistency is always nice. -Jay On Tue, Apr 21, 2015 at 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: KIP hangout on Apr 21
Jun, Not sure why in these invitations, I am not seeing the google hangout link. I am using IBM notes which is quite different from gmail and calendar. Is there anyway that you send the hangout link in the invitation body? Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Jun Rao j...@confluent.io To: dev@kafka.apache.org dev@kafka.apache.org Date: 04/18/2015 12:50 AM Subject:KIP hangout on Apr 21 Hi, We will have a KIP hangout at 3pm PST on Apr 21. The following is the tentative agenda. If you'd like to attend but haven't received an invite, please let me know. Agenda: KIP-4 (admin commands): wrap up any remaining issues KIP-11 (Authorization): KIP-12 (SSL/Kerberos): See if there is any blocker. jira backlog check Thanks, Jun
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hey Jun, Yes and we support wild cards for all acl entities principal, hosts and operation. Thanks Parth On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hey Sriharsha, Thanks for the excellent write-up. Couple of minor questions: 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. The long explanation on that is that setters/getters kind of imply a style of java programming where you have simple structs with getters and setters for each field. In general we try to have access methods only when necessary, and rather than setters model the full change or action being carried out, and if possible disallow change entirely. This is more in line with modern java style I think. We aren't perfect in following this, but once you start with getters and setters people start just adding them everywhere and then using them. -Jay On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
[jira] [Commented] (KAFKA-2034) sourceCompatibility not set in Kafka build.gradle
[ https://issues.apache.org/jira/browse/KAFKA-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505742#comment-14505742 ] Ismael Juma commented on KAFKA-2034: Created reviewboard https://reviews.apache.org/r/33410/diff/ against branch upstream/trunk sourceCompatibility not set in Kafka build.gradle - Key: KAFKA-2034 URL: https://issues.apache.org/jira/browse/KAFKA-2034 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Derek Bassett Priority: Minor Labels: newbie Attachments: KAFKA-2034.patch Original Estimate: 4h Remaining Estimate: 4h The build.gradle does not explicitly set the sourceCompatibility version in build.gradle. This allows kafka when built by Java 1.8 to incorrectly set the wrong version of the class files. This also would allow Java 1.8 features to be merged into Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2034) sourceCompatibility not set in Kafka build.gradle
[ https://issues.apache.org/jira/browse/KAFKA-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2034: --- Attachment: KAFKA-2034.patch sourceCompatibility not set in Kafka build.gradle - Key: KAFKA-2034 URL: https://issues.apache.org/jira/browse/KAFKA-2034 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Derek Bassett Priority: Minor Labels: newbie Attachments: KAFKA-2034.patch Original Estimate: 4h Remaining Estimate: 4h The build.gradle does not explicitly set the sourceCompatibility version in build.gradle. This allows kafka when built by Java 1.8 to incorrectly set the wrong version of the class files. This also would allow Java 1.8 features to be merged into Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2034) sourceCompatibility not set in Kafka build.gradle
[ https://issues.apache.org/jira/browse/KAFKA-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2034: --- Assignee: Ismael Juma Status: Patch Available (was: Open) sourceCompatibility not set in Kafka build.gradle - Key: KAFKA-2034 URL: https://issues.apache.org/jira/browse/KAFKA-2034 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Derek Bassett Assignee: Ismael Juma Priority: Minor Labels: newbie Attachments: KAFKA-2034.patch Original Estimate: 4h Remaining Estimate: 4h The build.gradle does not explicitly set the sourceCompatibility version in build.gradle. This allows kafka when built by Java 1.8 to incorrectly set the wrong version of the class files. This also would allow Java 1.8 features to be merged into Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Hey Andrii, thanks for all the hard work on this, it has come a long way. A couple questions and comments on this. For the errors, can we do the following: 1. Remove IllegalArgument from the name, we haven't used that convention for other errors. 2. Normalize this list with the existing errors. For example, elsewhere when you give an invalid topic name we give back an InvalidTopicException but this is proposing a new error for that. It would be good that these kinds of errors are handled the same way across all requests in the protocol. Other comments: 3. I don't understand MultipleInstructionsForOneTopic and MultipleTopicInstructionsInOneBatch and the description is quite vague. There is some implicit assumption in this proposal about how batching will be done that doesn't seem to be explained. 4. I think adding replica lag to the metadata request is out of place and should not be in the metadata request. Two reasons: a. This is something that can only be answered by the leader for that partition. So querying N partitions fundamentally mean querying N brokers (roughly). This is different from the other properties which are shared knowledge. b. This is a monitoring property not a configuration/metadata property. I recommend we remove this here and in the future add an API that gets all the monitoring stats from the server including lag. Adding all these to the metadata request won't make sense, right? 5. This includes a special request for preferred replica leader election. I feel that we should not expose an API for this because the user should not be in the business of managing leaders. We have gotten this feature to the point where preferred leadership election is enabled automatically. I think we should go further in that direction and do whatever work is required to make this the only option rather than trying to institute public apis for manually controlling it. 6. The API changes we discussed for the java api still aren't reflected in the proposal. -Jay On Tue, Apr 21, 2015 at 7:47 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi all, I've updated KIP-4 page to include all previously discussed items such as: new error codes, merged alter-topic and reassign-partitions requests, added TMR_V1. It'd be great if we concentrate on the Errors+Wire Protocol schema and discuss any remaining issues today, since first patch will include only server-side implementation. Thanks, Andrii Biletskyi On Tue, Apr 21, 2015 at 9:46 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: 1. Yes, this will be much easier. Okay, let's add it. 2, Okay. This will differ a little bit from the way currently kafka-topics.sh handles alter-topic command, but I think it's a reasonable restriction. I'll update KIP acordingly to our weekly call. Thanks, Andrii Biletskyi On Mon, Apr 20, 2015 at 10:56 PM, Jun Rao j...@confluent.io wrote: 1. Yes, lag is probably only going to be useful for the admin client. However, so is isr. It seems to me that we should get lag and isr from the same request. I was thinking that we can just extend TMR by changing replicas from an array of int to an array of (int, lag) pairs. Is that too complicated? 3. I was thinking that we just don't allow the cli to change more than one thing at a time. So, you will get an error if you want to change both partitions and configs. Thanks, Jun On Sun, Apr 19, 2015 at 8:22 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 1. Yes, seems we can add lag info to the TMR. But before that I wonder whether there are other reasons we need this info except for reassign partition command? As we discussed earlier the problem with poor monitoring capabilities for reassign-partitions (as currently we only inform users Completed/In Progress per partition) may require separate solution. We were thinking about separate Wire protocol request. And I actually like your idea about adding some sort of BrokerMetadataRequest for these purposes. I actually think we can cover some other items (like rack-awareness) but for me it deserves a separate KIP really. Also, adding Replica-Lag map per partition will make TopicMetadataResponse very sophisticated: Map[TopicName, Map[PartitionId, Map[ReplicaId, Lag]]. Maybe we need to leave it for a moment and propose new request rather than making a new step towards one monster request. 2. Yes, error per topic. The only question is whether we should execute at least the very first alter topic command from the duplicated topic set or return error for all ... I think the more predictable and reasonable option for clients would be returning errors for all duplicated topics. 3. Hm, yes. Actually we also have change topic config there. But it is not related to such replication commands as increase replicas or change replica
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Guys, Thank you for your time. A short summary of our discussion. Answering previous items: 1. 2. I will double check existing error codes to align the list of errors that needs to be added. 3. We agreed to think again about the batch requests semantics. The main concern is that users would expect we allow executing multiple instructions for one topic in one batch. I will start implementation and check whether there are any impediments to handle it this way. The same for AlterTopicRequest - I will try to make request semantics as easy as possible and allow users change different things at one time - e.g. change nr of partitions and replicas in one instruction. 4. We agreed not to add to TMR lag information. 5. We discussed preferred replica command and it was pointed out that generally users shouldn't call this command manually now since this is automatically handled by the cluster. If there are no objections (especially from devops people) I will remove respective request. 6. As discussed AdminClient API is a phase 2 and will go after Wire Protocol extensions. It will be finalized as java-doc after I complete patch for phase 1 - Wire Protocol + server-side code handling requests. Thanks, Andrii Biletskyi On Wed, Apr 22, 2015 at 12:36 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Andrii, thanks for all the hard work on this, it has come a long way. A couple questions and comments on this. For the errors, can we do the following: 1. Remove IllegalArgument from the name, we haven't used that convention for other errors. 2. Normalize this list with the existing errors. For example, elsewhere when you give an invalid topic name we give back an InvalidTopicException but this is proposing a new error for that. It would be good that these kinds of errors are handled the same way across all requests in the protocol. Other comments: 3. I don't understand MultipleInstructionsForOneTopic and MultipleTopicInstructionsInOneBatch and the description is quite vague. There is some implicit assumption in this proposal about how batching will be done that doesn't seem to be explained. 4. I think adding replica lag to the metadata request is out of place and should not be in the metadata request. Two reasons: a. This is something that can only be answered by the leader for that partition. So querying N partitions fundamentally mean querying N brokers (roughly). This is different from the other properties which are shared knowledge. b. This is a monitoring property not a configuration/metadata property. I recommend we remove this here and in the future add an API that gets all the monitoring stats from the server including lag. Adding all these to the metadata request won't make sense, right? 5. This includes a special request for preferred replica leader election. I feel that we should not expose an API for this because the user should not be in the business of managing leaders. We have gotten this feature to the point where preferred leadership election is enabled automatically. I think we should go further in that direction and do whatever work is required to make this the only option rather than trying to institute public apis for manually controlling it. 6. The API changes we discussed for the java api still aren't reflected in the proposal. -Jay On Tue, Apr 21, 2015 at 7:47 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi all, I've updated KIP-4 page to include all previously discussed items such as: new error codes, merged alter-topic and reassign-partitions requests, added TMR_V1. It'd be great if we concentrate on the Errors+Wire Protocol schema and discuss any remaining issues today, since first patch will include only server-side implementation. Thanks, Andrii Biletskyi On Tue, Apr 21, 2015 at 9:46 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: 1. Yes, this will be much easier. Okay, let's add it. 2, Okay. This will differ a little bit from the way currently kafka-topics.sh handles alter-topic command, but I think it's a reasonable restriction. I'll update KIP acordingly to our weekly call. Thanks, Andrii Biletskyi On Mon, Apr 20, 2015 at 10:56 PM, Jun Rao j...@confluent.io wrote: 1. Yes, lag is probably only going to be useful for the admin client. However, so is isr. It seems to me that we should get lag and isr from the same request. I was thinking that we can just extend TMR by changing replicas from an array of int to an array of (int, lag) pairs. Is that too complicated? 3. I was thinking that we just don't allow the cli to change more than one thing at a time. So, you will get an error if you want to change both partitions and configs. Thanks, Jun On Sun, Apr 19, 2015 at 8:22 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 1. Yes, seems we can add lag info to the TMR. But before
[jira] [Commented] (KAFKA-2141) Integrate checkstyle for Java code
[ https://issues.apache.org/jira/browse/KAFKA-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506087#comment-14506087 ] Jay Kreps commented on KAFKA-2141: -- Do you mean adding it to cover java code that isn't in the clients/ directory? I tried to set that up but somehow the mixture of scala and java caused confusion... Integrate checkstyle for Java code -- Key: KAFKA-2141 URL: https://issues.apache.org/jira/browse/KAFKA-2141 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang As we move from Scala to more Java code, it is better to start firmly with coding style. So I propose integrating checkstyle to catch coding style issues at build / test time to save on reviewing efforts. It has been integrated to Kafka (KAFKA-1915), and my personal experience is that once the import / code configs are set appropriately it is worthwhile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Metrics package discussion
I'm veery late to this thread. I'm with Gwen about metrics being the public API (but often not treated as such, sadly). I don't know the details of internal issues around how metrics are implemented but, for selfish reasons, would hate to see MBeans change - we spent weeks contributing more than a dozen iterations of patches for changing the old Kafka 0.8.1.x metrics to what they are now in 0.8.2. I wish somebody had mentioned these (known?) issues then - since metrics were so drastically changed then, we could have done it right immediately. Also, when you change MBean names and structure you force everyone to rewrite their MBean parsers (not your problem, but still something to be aware of). If metrics are going to be changing, would it be possible to enumerate the changes somewhere? Finally, I tried finding a JIRA issue for changing metrics, so I can watch it, but couldn't find it here: https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.3%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC Am I looking in the wrong place? Is there an issue for the changes discussed in this thread? Is the decision to do it in 0.8.3 final? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Tue, Mar 31, 2015 at 12:43 PM, Steven Wu stevenz...@gmail.com wrote: My main concern is that we don't do the migration in 0.8.3, we will be left with some metrics in YM format and some others in KM format (as we start sharing client code on the broker). This is probably a worse situation to be in. +1. I am not sure how our servo adaptor will work if there are two formats for metrics? unless there is an easy way to check the format (YM/KM). On Tue, Mar 31, 2015 at 9:40 AM, Jun Rao j...@confluent.io wrote: (2) The metrics are clearly part of the client API and we are not changing that (at least for the new client). Arguably, the metrics are also part of the broker side API. However, since they affect fewer parties (mostly just the Kafka admins), it may be easier to make those changes. My main concern is that we don't do the migration in 0.8.3, we will be left with some metrics in YM format and some others in KM format (as we start sharing client code on the broker). This is probably a worse situation to be in. Thanks, Jun On Tue, Mar 31, 2015 at 9:26 AM, Gwen Shapira gshap...@cloudera.com wrote: (2) I believe we agreed that our metrics are a public API. I believe we also agree we don't break API in minor releases. So, it seems obvious to me that we can't make breaking changes to metrics in minor releases. I'm not convinced we did it in the past is a good reason to do it again. Is there a strong reason to do it in a 0.8.3 time-frame? On Tue, Mar 31, 2015 at 7:59 AM, Jun Rao j...@confluent.io wrote: (2) Not sure why we can't do this in 0.8.3. We changed the metrics names in 0.8.2 already. Given that we need to share code btw the client and the core, and we need to keep the metrics consistent on the broker, it seems that we have no choice but to migrate to KM. If so, it seems that the sooner that we do this, the better. It is important to give people an easy path for migration. However, it may not be easy to keep the mbean names exactly the same. For example, YM has hardcoded attributes (e.g. 1-min-rate, 5-min-rate, 15-min-rate, etc for rates) that are not available in KM. One benefit out of this migration is that one can get the metrics in the client and the broker in the same way. Thanks, Jun On Mon, Mar 30, 2015 at 9:26 PM, Gwen Shapira gshap...@cloudera.com wrote: (1) It will be interesting to see what others use for monitoring integration, to see what is already covered with existing JMX integrations and what needs special support. (2) I think the migration story is more important - this is a non-compatible change, right? So we can't do it in 0.8.3 timeframe, it has to be in 0.9? And we need to figure out how will users migrate - do we just tell everyone please reconfigure all your monitors from scratch - don't worry, it is worth it? I know you keep saying we did it before and our users are used to it, but I think there are a lot more users now, and some of them have different compatibility expectations. We probably need to find: * A least painful way to migrate - can we keep the names of at least most of the metrics intact? * Good explanation of what users gain from this painful migration (i.e. more accurate statistics due to gazillion histograms) On Mon, Mar 30, 2015 at 6:29 PM, Jun Rao j...@confluent.io wrote: If we are committed to migrating the broker side metrics
Re: Review Request 33410: Patch for KAFKA-2034
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33410/#review81109 --- Ship it! Ship It! - Ewen Cheslack-Postava On April 21, 2015, 9 p.m., Ismael Juma wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33410/ --- (Updated April 21, 2015, 9 p.m.) Review request for kafka. Bugs: KAFKA-2034 https://issues.apache.org/jira/browse/KAFKA-2034 Repository: kafka Description --- KAFKA-2034; Set sourceCompatibility in build.gradle Diffs - build.gradle 4775ee46c480eab7b8250e61ba1705d00f72a6aa Diff: https://reviews.apache.org/r/33410/diff/ Testing --- Compiled with JDK 8 and verified that the target level in the bytecode was `50` (ie Java 6) instead of `52`: find . -name '*.class' | xargs javap -verbose | grep major version: | sort | uniq major version: 50 Thanks, Ismael Juma
Re: Review Request 33417: Patch for KAFKA-2138
On April 21, 2015, 11:56 p.m., Guozhang Wang wrote: This piece of logic has been quite complex and awkward to me now, for example in ready() a node will only not be considered if ALL of its partitions are either not sendable or are in the backoff period, and the reason we want to get ready nodes before drain is to check if they are really ready or not. This is mainly because 1) we need to be careful when calling client.poll() later about the timeout value in order to avoid busy waiting, 2) we need to make sure if metadata refresh is needed, it needs to be sent as higher priority than other requests. I suggest re-writing this fraction of code to make it clearer, in the following process: 0. while handle metadata response and update the metadata, check for ANY partitions if their leader is not known; if there is set metadata.requestUpdate. So we do not need to do this step anymore at the start of run(). 1. get all the ready nodes based on their connection state only (i.e. no peeking in RecordAccumulator), and record the node_backoff as min (reconnection_backoff - time_waited) of all nodes; if one of these node is connected or connecting, this backoff should be 0. 2. for each of ready nodes, try to drain their corresponding partitions in RecordAccumulator while considering or kinds of conditions (full, expired, exhausted, etc...), and record the data_backoff as min (retry_backoff - time_waited) of all partitions; if one of the partitions is immediately sendable, this backoff should be 0. 3. formulate produce request and call client.poll() with timeout = reconnection_backoff 0 ? recconection_backoff : retry_backoff. 4. in NetworkClient.poll(), the logic of maybeUpdateMetadata while update metadataTimeout can also be simplified. This may contain some flaw, Jiangjie / Ewen let me know if you see any issues. Hi Guozhang, I think that makes sense. We should exchange the checking order to check connection ready first then the data ready. I'll try to submit a refactored patch and will throw questions if there is anything. Thanks. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/#review81097 --- On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- (Updated April 21, 2015, 10:51 p.m.) Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description --- Patch for KAFKA-2138 honor retry backoff in KafkaProducer Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi Jay, Thanks for the review. 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... I’ve non-blocking handshake on the server side as well as for new producer client. Blocking handshake is only done for BlockingChannel.scala and it just loops over the non-blocking hand shake until the context is established. So on the server side (SocketServer.scala) as it goes through the steps and returns “READ or WRITE” signal for next step. For BlockingChannel the worst case I look at is the connection timeout but most times this handshake will finish up much quicker . I am cleaning up the code will send up a patch in next few days . 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. So blocking channel used not only by simple consumer but also ControllerChannelManager and controlled shutdown also. Are we planning on deprecating it. I think at least for ControllerChannelManager it makes sense to have a blocking channel. If the users want to lock down the cluster i.e no PLAINTEXT channels are allowed than all the communication has to go through either SSL and KERBEROS so in this case we need add this capability to BlockingChannel. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. My bad on adding getters and setters :). I’ll work on removing it and change the KIP accordingly. I still need some accessor methods though . Thanks, Harsha On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Sriharsha, Thanks for the excellent write-up. Couple of minor questions: 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. The long explanation on that is that setters/getters kind of imply a style of java programming where you have simple structs with getters and setters for each field. In general we try to have access methods only when necessary, and rather than setters model the full change or action being carried out, and if possible disallow change entirely. This is more in line with modern java style I think. We aren't perfect in following this, but once you start with getters and setters people start just adding them everywhere and then using them. -Jay On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
Re: Review Request 33088: add heartbeat to coordinator
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review79762 --- core/src/main/scala/kafka/api/RequestKeys.scala https://reviews.apache.org/r/33088/#comment129291 Can we just add these two request into keyToNameAndDeserializerMap? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129292 Move kafka imports above scala / external lib imports. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129294 How about handleConsumerJoinGroup? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129293 I think we use if { } else { } convention here. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129295 Ditto above. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129299 Why we try to complete any dealyed heartbeat here? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129304 Actually can we use group Id as the delayed key here, and hence by checking just one key all delayed join with consumers having this key will be completed. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/33088/#comment129306 We do not need to cut the socket connection from the coordinator. core/src/main/scala/kafka/network/RequestChannel.scala https://reviews.apache.org/r/33088/#comment129310 Seems ApiKeys are not used? - Guozhang Wang On April 18, 2015, 7:16 p.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/ --- (Updated April 18, 2015, 7:16 p.m.) Review request for kafka. Bugs: KAFKA-1334 https://issues.apache.org/jira/browse/KAFKA-1334 Repository: kafka Description --- add heartbeat to coordinator todo: - see how it performs under real load - add error code handling on the consumer side - implement the partition assignors Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 456b602245e111880e1b8b361319cabff38ee0e9 core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 2f5797064d4131ecfc9d2750d9345a9fa3972a9a core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala df60cbc35d09937b4e9c737c67229889c69d8698 core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 8defa2e41c92f1ebe255177679d275c70dae5b3e core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION core/src/main/scala/kafka/coordinator/GroupRegistry.scala 94ef5829b3a616c90018af1db7627bfe42e259e5 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 821e26e97eaa97b5f4520474fff0fedbf406c82a core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION core/src/main/scala/kafka/server/DelayedOperationKey.scala b673e43b0ba401b2e22f27aef550e3ab0ef4323c core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33088/diff/ Testing --- Thanks, Onur Karaman
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506469#comment-14506469 ] Parth Brahmbhatt commented on KAFKA-1688: - I would like to create 3 subtasks so I can brake down the code reviews in more manageable pieces but I don't see the option to add subtasks to this jira, may be because it is already a subtask? I would like to create the following subtasks: * Public classes and interfaces with changes to KafkaServer and KafkaAPI. This should unblock any custom authorization work like Ranger or Santry. * Default out of box implementation of Authorizer. * CLI for acl management. Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Attachments: KAFKA-1688_2015-04-10_11:08:39.patch Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33420: Patch for KAFKA-2140
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33420/ --- Review request for kafka. Bugs: KAFKA-2140 https://issues.apache.org/jira/browse/KAFKA-2140 Repository: kafka Description --- Remove redundant `extends Object` Don't pass `char` to `StringBuilder` constructor It ends up calling the `StringBuilder(int capacity)` constructor. Fix `equals` call to use the correct value Improve numeric coercions Remove redundant `val` modifiers in case classes Remove redundant `return` keywords Remove redundant semi-colons Use `forall` instead of `foldLeft` Remove `javaListToScalaBuffer` implicit No need to support 2.8.x. Remove unnecessary casts Remove unnecessary `toSeq` Simplify boolean logic Use `sum` instead of `foldLeft` Use `foreach` instead of `map` Use `reverseMap` instead of `reverse.map` Use `contains` and fix comparison to use the same type for both parameters Also use pattern matching to extract `topic` and `partitions` more concisely. Remove unnecessary `toInt` Fix inconsistency in calls to `format` Remove redundant `toString` call Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 42c72198a0325e234cf1d428b687663099de884e clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 305ec8ee2b94251c25d0734d3f56495d270744de clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 5a575553d30c1c0bda9ffef9e9b9eafae28deba5 clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java cf91f5f90660fa9153939b02fcff29131dd58c00 clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java d3394ee669e1c2254403e95393511d0a17e6f250 clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java 66442ed38533af081897c05292efef4ca58b9bee clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java 8b926340de49bf6ff0910d129bd680e9e61b26de contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java 1d0e0a917985736bef7af66c741e5807d8503121 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala bbe3362b2b0eedb79ab80cfe47fd498f6c2941d6 core/src/main/scala/kafka/admin/TopicCommand.scala 60f0228e6735a492315842afe28bcfa317478968 core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 5be393ab8272a49437b5057ed098ccdc42f352e5 core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 5e0a1cf4f407aa859175d619c78d8fec5a0fbd0a core/src/main/scala/kafka/api/GenericRequestAndHeader.scala f40e19f4b2ad703e7df8378cb5bd5c78e3b7c6de core/src/main/scala/kafka/api/GenericResponseAndHeader.scala a4879e26b5362fb943cdd9768dfca3ad40472405 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 2fad585f126699ba8d26c901a41bcf6b8198bf62 core/src/main/scala/kafka/api/StopReplicaResponse.scala 3431f3f65d1aea694324d2c9fc8f48e752e70cd6 core/src/main/scala/kafka/api/TopicMetadata.scala 5e39f453b429cb42ab87f695b6b30576f9ce98ce core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/client/ClientUtils.scala b66424b230463df6641a848b99bb73312ea66e33 core/src/main/scala/kafka/cluster/EndPoint.scala 3286f6d4f25ca2118105acbcaf94d308ac0a0c2e core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala 97acdb23f6e95554c3e0357aa112eddfc875efbc core/src/main/scala/kafka/controller/KafkaController.scala 3a09377611b48198c4c3cd1a118fc12eda0543d4 core/src/main/scala/kafka/controller/ReplicaStateMachine.scala e5c56e0618a04a2abf8bd02045a458901b75bb1d core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 8defa2e41c92f1ebe255177679d275c70dae5b3e core/src/main/scala/kafka/javaapi/Implicits.scala 8baf4d468027a1592273134127768b095c71612f core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala 0125565c84aeaa345d37ed7e9e259ce7b00816ea core/src/main/scala/kafka/log/CleanerConfig.scala ade838672d671549771637e041cdfb22a03b17c1 core/src/main/scala/kafka/log/LogCleaner.scala 12eacdfa7b573988036a8359a156c03b62d22c72 core/src/main/scala/kafka/log/LogConfig.scala da55a348f37a3d6d99032c39398f7ccb11068f42 core/src/main/scala/kafka/log/OffsetIndex.scala 4ab22deec99dba7ccfffd6d9f5f7a305548eb875 core/src/main/scala/kafka/log/OffsetMap.scala 2940e47cb88e4577adedcc2d2131238e82e589c1 core/src/main/scala/kafka/log/OffsetPosition.scala 6cefde4d90e2885da3fa6e4ee30e738fdbc542fb core/src/main/scala/kafka/message/MessageSet.scala f1b8432f4a96fd5c8f9ad273e4b7e1590486e6a4 core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 30fd0ea3ce061e735214e213ad6270a7f7a6092d
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506078#comment-14506078 ] Jay Kreps commented on KAFKA-2029: -- I don't have a ton of context on this chunk of code. The controller is trying to model a state machine, why does it have so much crazy ad hoc locking in it? Why can't the controller just be single threaded? Is this due to the reliance on blocking I/O for the RPC? If so can't we just move this over to the new network client and just migrate the whole thing to a proper state machine? Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to
[jira] [Resolved] (KAFKA-2141) Integrate checkstyle for Java code
[ https://issues.apache.org/jira/browse/KAFKA-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-2141. -- Resolution: Not A Problem Integrate checkstyle for Java code -- Key: KAFKA-2141 URL: https://issues.apache.org/jira/browse/KAFKA-2141 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang As we move from Scala to more Java code, it is better to start firmly with coding style. So I propose integrating checkstyle to catch coding style issues at build / test time to save on reviewing efforts. It has been integrated to Kafka (KAFKA-1915), and my personal experience is that once the import / code configs are set appropriately it is worthwhile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2141) Integrate checkstyle for Java code
[ https://issues.apache.org/jira/browse/KAFKA-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506174#comment-14506174 ] Guozhang Wang commented on KAFKA-2141: -- I meant to create a SAMZA ticket but get messed up, closing it now. Integrate checkstyle for Java code -- Key: KAFKA-2141 URL: https://issues.apache.org/jira/browse/KAFKA-2141 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang As we move from Scala to more Java code, it is better to start firmly with coding style. So I propose integrating checkstyle to catch coding style issues at build / test time to save on reviewing efforts. It has been integrated to Kafka (KAFKA-1915), and my personal experience is that once the import / code configs are set appropriately it is worthwhile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side
Jiangjie Qin created KAFKA-2139: --- Summary: Add a separate controller messge queue with higher priority on broker side Key: KAFKA-2139 URL: https://issues.apache.org/jira/browse/KAFKA-2139 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin This ticket is supposed to be working together with KAFKA-2029. There are two issues with current controller to broker messages. 1. On the controller side the message are sent without synchronization. 2. On broker side the controller messages share the same queue as client messages. The problem here is that brokers process the controller messages for the same partition at different times and the variation could be big. This causes unnecessary data loss and prolong the preferred leader election / controlled shutdown/ partition reassignment, etc. KAFKA-2029 was trying to add a boundary between messages for different partitions. For example, before leader migration for previous partition finishes, the leader migration for next partition won't begin. This ticket is trying to let broker process controller messages faster. So the idea is have separate queue to hold controller messages, if there are controller messages, KafkaApi thread will first take care of those messages, otherwise it will proceed messages from clients. Those two tickets are not ultimate solution to current controller problems, but just mitigate them with minor code changes. Moving forward, we still need to think about rewriting controller in a cleaner way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side
[ https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin reassigned KAFKA-2139: --- Assignee: Jiangjie Qin Add a separate controller messge queue with higher priority on broker side --- Key: KAFKA-2139 URL: https://issues.apache.org/jira/browse/KAFKA-2139 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin This ticket is supposed to be working together with KAFKA-2029. There are two issues with current controller to broker messages. 1. On the controller side the message are sent without synchronization. 2. On broker side the controller messages share the same queue as client messages. The problem here is that brokers process the controller messages for the same partition at different times and the variation could be big. This causes unnecessary data loss and prolong the preferred leader election / controlled shutdown/ partition reassignment, etc. KAFKA-2029 was trying to add a boundary between messages for different partitions. For example, before leader migration for previous partition finishes, the leader migration for next partition won't begin. This ticket is trying to let broker process controller messages faster. So the idea is have separate queue to hold controller messages, if there are controller messages, KafkaApi thread will first take care of those messages, otherwise it will proceed messages from clients. Those two tickets are not ultimate solution to current controller problems, but just mitigate them with minor code changes. Moving forward, we still need to think about rewriting controller in a cleaner way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506011#comment-14506011 ] Jiangjie Qin commented on KAFKA-2138: - [~junrao] [~jjkoshy] Do you think we should also check this into 0.8.2.2? KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Following up on the KIP discussion. Two options for authorizing consumers to read topic t as part of group g: 1. READ permission on resource /topic/t 2. READ permission on resource /topic/t AND WRITE permission on /group/g The advantage of (1) is that it is simpler. The disadvantage is that any member of any group that reads from t can commit offsets as any other member of a different group. This doesn't effect data security (who can access what) but it is a bit of a management issue--a malicious person can cause data loss or duplicates for another consumer by committing offset. I think I favor (2) but it's worth it to think it through. -Jay On Tue, Apr 21, 2015 at 2:43 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hey Jun, Yes and we support wild cards for all acl entities principal, hosts and operation. Thanks Parth On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Adding my notes from today's call to the thread: ** Deny or Allow all by default? We will add a configuration to control this. The configuration will default to “allow” for backward compatibility. Security admins can set it to deny ** Storing ACLs for default authorizers: We'll store them in ZK. We'll support pointing the authorizer to any ZK. The use of ZK will be internal to the default authorizer. Authorizer reads ACLs from cache every hour. We proposed having mechanism (possibly via new ZK node) to tell broker to refresh the cache immediately. ** Support deny as permission type - we agreed to keep this. ** Mapping operations to API: We may need to add Group as a resource, with JoinGroup and OffsetCommit require privilege on the consumer group. This can be something we pass now and authorizers can support in future. - Jay will write specifics to the mailing list discussion. On Tue, Apr 21, 2015 at 4:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Following up on the KIP discussion. Two options for authorizing consumers to read topic t as part of group g: 1. READ permission on resource /topic/t 2. READ permission on resource /topic/t AND WRITE permission on /group/g The advantage of (1) is that it is simpler. The disadvantage is that any member of any group that reads from t can commit offsets as any other member of a different group. This doesn't effect data security (who can access what) but it is a bit of a management issue--a malicious person can cause data loss or duplicates for another consumer by committing offset. I think I favor (2) but it's worth it to think it through. -Jay On Tue, Apr 21, 2015 at 2:43 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hey Jun, Yes and we support wild cards for all acl entities principal, hosts and operation. Thanks Parth On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
[jira] [Created] (KAFKA-2140) Improve code readability
Ismael Juma created KAFKA-2140: -- Summary: Improve code readability Key: KAFKA-2140 URL: https://issues.apache.org/jira/browse/KAFKA-2140 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor There are a number of places where code could be written in a more readable and idiomatic form. It's easier to explain with a patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1810) Add IP Filtering / Whitelists-Blacklists
[ https://issues.apache.org/jira/browse/KAFKA-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Holoman updated KAFKA-1810: Resolution: Won't Fix Status: Resolved (was: Patch Available) Add IP Filtering / Whitelists-Blacklists - Key: KAFKA-1810 URL: https://issues.apache.org/jira/browse/KAFKA-1810 Project: Kafka Issue Type: New Feature Components: core, network, security Reporter: Jeff Holoman Assignee: Jeff Holoman Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-1810.patch, KAFKA-1810_2015-01-15_19:47:14.patch, KAFKA-1810_2015-03-15_01:13:12.patch While longer-term goals of security in Kafka are on the roadmap there exists some value for the ability to restrict connection to Kafka brokers based on IP address. This is not intended as a replacement for security but more of a precaution against misconfiguration and to provide some level of control to Kafka administrators about who is reading/writing to their cluster. 1) In some organizations software administration vs o/s systems administration and network administration is disjointed and not well choreographed. Providing software administrators the ability to configure their platform relatively independently (after initial configuration) from Systems administrators is desirable. 2) Configuration and deployment is sometimes error prone and there are situations when test environments could erroneously read/write to production environments 3) An additional precaution against reading sensitive data is typically welcomed in most large enterprise deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33417: Patch for KAFKA-2138
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/#review81097 --- This piece of logic has been quite complex and awkward to me now, for example in ready() a node will only not be considered if ALL of its partitions are either not sendable or are in the backoff period, and the reason we want to get ready nodes before drain is to check if they are really ready or not. This is mainly because 1) we need to be careful when calling client.poll() later about the timeout value in order to avoid busy waiting, 2) we need to make sure if metadata refresh is needed, it needs to be sent as higher priority than other requests. I suggest re-writing this fraction of code to make it clearer, in the following process: 0. while handle metadata response and update the metadata, check for ANY partitions if their leader is not known; if there is set metadata.requestUpdate. So we do not need to do this step anymore at the start of run(). 1. get all the ready nodes based on their connection state only (i.e. no peeking in RecordAccumulator), and record the node_backoff as min (reconnection_backoff - time_waited) of all nodes; if one of these node is connected or connecting, this backoff should be 0. 2. for each of ready nodes, try to drain their corresponding partitions in RecordAccumulator while considering or kinds of conditions (full, expired, exhausted, etc...), and record the data_backoff as min (retry_backoff - time_waited) of all partitions; if one of the partitions is immediately sendable, this backoff should be 0. 3. formulate produce request and call client.poll() with timeout = reconnection_backoff 0 ? recconection_backoff : retry_backoff. 4. in NetworkClient.poll(), the logic of maybeUpdateMetadata while update metadataTimeout can also be simplified. This may contain some flaw, Jiangjie / Ewen let me know if you see any issues. - Guozhang Wang On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- (Updated April 21, 2015, 10:51 p.m.) Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description --- Patch for KAFKA-2138 honor retry backoff in KafkaProducer Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side
[ https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506085#comment-14506085 ] Jay Kreps commented on KAFKA-2139: -- Do you want to sketch out the design you have in mind? I really want to make sure we don't add weird business logic inside the network layer if at all possible. I also think that for our sanity it is important to maintain the property that requests from a single connection are always processed in order (which adding to the end of the queue would violate). I agree that we want it to be the case that all traffic from the controller is prioritized over user requests but let's find a generic way to do this as a feature of the network layer without blurring those layers if possible. Add a separate controller messge queue with higher priority on broker side --- Key: KAFKA-2139 URL: https://issues.apache.org/jira/browse/KAFKA-2139 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin This ticket is supposed to be working together with KAFKA-2029. There are two issues with current controller to broker messages. 1. On the controller side the message are sent without synchronization. 2. On broker side the controller messages share the same queue as client messages. The problem here is that brokers process the controller messages for the same partition at different times and the variation could be big. This causes unnecessary data loss and prolong the preferred leader election / controlled shutdown/ partition reassignment, etc. KAFKA-2029 was trying to add a boundary between messages for different partitions. For example, before leader migration for previous partition finishes, the leader migration for next partition won't begin. This ticket is trying to let broker process controller messages faster. So the idea is have separate queue to hold controller messages, if there are controller messages, KafkaApi thread will first take care of those messages, otherwise it will proceed messages from clients. Those two tickets are not ultimate solution to current controller problems, but just mitigate them with minor code changes. Moving forward, we still need to think about rewriting controller in a cleaner way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33417: Patch for KAFKA-2138
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description --- Patch for KAFKA-2138 honor retry backoff in KafkaProducer Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2138: Status: Patch Available (was: Open) KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505955#comment-14505955 ] Jiangjie Qin commented on KAFKA-2138: - Created reviewboard https://reviews.apache.org/r/33417/diff/ against branch origin/trunk KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2138: Attachment: KAFKA-2138.patch KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2140) Improve code readability
[ https://issues.apache.org/jira/browse/KAFKA-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506066#comment-14506066 ] Ismael Juma commented on KAFKA-2140: Created reviewboard https://reviews.apache.org/r/33420/diff/ against branch upstream/trunk Improve code readability Key: KAFKA-2140 URL: https://issues.apache.org/jira/browse/KAFKA-2140 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2140.patch There are a number of places where code could be written in a more readable and idiomatic form. It's easier to explain with a patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2141) Integrate checkstyle for Java code
Guozhang Wang created KAFKA-2141: Summary: Integrate checkstyle for Java code Key: KAFKA-2141 URL: https://issues.apache.org/jira/browse/KAFKA-2141 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang As we move from Scala to more Java code, it is better to start firmly with coding style. So I propose integrating checkstyle to catch coding style issues at build / test time to save on reviewing efforts. It has been integrated to Kafka (KAFKA-1915), and my personal experience is that once the import / code configs are set appropriately it is worthwhile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2140) Improve code readability
[ https://issues.apache.org/jira/browse/KAFKA-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506067#comment-14506067 ] Ismael Juma commented on KAFKA-2140: This PR shows the changes in an easy to review way: https://github.com/ijuma/kafka/pull/1 Is there interest in this kind of change? Improve code readability Key: KAFKA-2140 URL: https://issues.apache.org/jira/browse/KAFKA-2140 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2140.patch There are a number of places where code could be written in a more readable and idiomatic form. It's easier to explain with a patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2140) Improve code readability
[ https://issues.apache.org/jira/browse/KAFKA-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2140: --- Status: Patch Available (was: Open) Improve code readability Key: KAFKA-2140 URL: https://issues.apache.org/jira/browse/KAFKA-2140 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2140.patch There are a number of places where code could be written in a more readable and idiomatic form. It's easier to explain with a patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2140) Improve code readability
[ https://issues.apache.org/jira/browse/KAFKA-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2140: --- Attachment: KAFKA-2140.patch Improve code readability Key: KAFKA-2140 URL: https://issues.apache.org/jira/browse/KAFKA-2140 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2140.patch There are a number of places where code could be written in a more readable and idiomatic form. It's easier to explain with a patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2114) Unable to change min.insync.replicas default
[ https://issues.apache.org/jira/browse/KAFKA-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2114: Attachment: KAFKA-2114.patch Unable to change min.insync.replicas default Key: KAFKA-2114 URL: https://issues.apache.org/jira/browse/KAFKA-2114 Project: Kafka Issue Type: Bug Reporter: Bryan Baugher Assignee: Gwen Shapira Fix For: 0.8.2.1 Attachments: KAFKA-2114.patch Following the comment here[1] I was unable to change the min.insync.replicas default value. I tested this by setting up a 3 node cluster, wrote to a topic with a replication factor of 3, using request.required.acks=-1 and setting min.insync.replicas=2 on the broker's server.properties. I then shutdown 2 brokers but I was still able to write successfully. Only after running the alter topic command setting min.insync.replicas=2 on the topic did I see write failures. [1] - http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCANZ-JHF71yqKE6%2BKKhWe2EGUJv6R3bTpoJnYck3u1-M35sobgg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
I have added the notes to KIP-11 Open question sections. Thanks Parth On 4/21/15, 4:49 PM, Gwen Shapira gshap...@cloudera.com wrote: Adding my notes from today's call to the thread: ** Deny or Allow all by default? We will add a configuration to control this. The configuration will default to “allow” for backward compatibility. Security admins can set it to deny ** Storing ACLs for default authorizers: We'll store them in ZK. We'll support pointing the authorizer to any ZK. The use of ZK will be internal to the default authorizer. Authorizer reads ACLs from cache every hour. We proposed having mechanism (possibly via new ZK node) to tell broker to refresh the cache immediately. ** Support deny as permission type - we agreed to keep this. ** Mapping operations to API: We may need to add Group as a resource, with JoinGroup and OffsetCommit require privilege on the consumer group. This can be something we pass now and authorizers can support in future. - Jay will write specifics to the mailing list discussion. On Tue, Apr 21, 2015 at 4:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Following up on the KIP discussion. Two options for authorizing consumers to read topic t as part of group g: 1. READ permission on resource /topic/t 2. READ permission on resource /topic/t AND WRITE permission on /group/g The advantage of (1) is that it is simpler. The disadvantage is that any member of any group that reads from t can commit offsets as any other member of a different group. This doesn't effect data security (who can access what) but it is a bit of a management issue--a malicious person can cause data loss or duplicates for another consumer by committing offset. I think I favor (2) but it's worth it to think it through. -Jay On Tue, Apr 21, 2015 at 2:43 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hey Jun, Yes and we support wild cards for all acl entities principal, hosts and operation. Thanks Parth On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
[jira] [Updated] (KAFKA-2114) Unable to change min.insync.replicas default
[ https://issues.apache.org/jira/browse/KAFKA-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2114: Status: Patch Available (was: Open) Unable to change min.insync.replicas default Key: KAFKA-2114 URL: https://issues.apache.org/jira/browse/KAFKA-2114 Project: Kafka Issue Type: Bug Reporter: Bryan Baugher Assignee: Gwen Shapira Fix For: 0.8.2.1 Attachments: KAFKA-2114.patch Following the comment here[1] I was unable to change the min.insync.replicas default value. I tested this by setting up a 3 node cluster, wrote to a topic with a replication factor of 3, using request.required.acks=-1 and setting min.insync.replicas=2 on the broker's server.properties. I then shutdown 2 brokers but I was still able to write successfully. Only after running the alter topic command setting min.insync.replicas=2 on the topic did I see write failures. [1] - http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCANZ-JHF71yqKE6%2BKKhWe2EGUJv6R3bTpoJnYck3u1-M35sobgg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2114) Unable to change min.insync.replicas default
[ https://issues.apache.org/jira/browse/KAFKA-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506081#comment-14506081 ] Gwen Shapira commented on KAFKA-2114: - Created reviewboard https://reviews.apache.org/r/33421/diff/ against branch trunk Unable to change min.insync.replicas default Key: KAFKA-2114 URL: https://issues.apache.org/jira/browse/KAFKA-2114 Project: Kafka Issue Type: Bug Reporter: Bryan Baugher Assignee: Gwen Shapira Fix For: 0.8.2.1 Attachments: KAFKA-2114.patch Following the comment here[1] I was unable to change the min.insync.replicas default value. I tested this by setting up a 3 node cluster, wrote to a topic with a replication factor of 3, using request.required.acks=-1 and setting min.insync.replicas=2 on the broker's server.properties. I then shutdown 2 brokers but I was still able to write successfully. Only after running the alter topic command setting min.insync.replicas=2 on the topic did I see write failures. [1] - http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCANZ-JHF71yqKE6%2BKKhWe2EGUJv6R3bTpoJnYck3u1-M35sobgg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2114) Unable to change min.insync.replicas default
[ https://issues.apache.org/jira/browse/KAFKA-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506084#comment-14506084 ] Gwen Shapira commented on KAFKA-2114: - Ended up being very silly case of not passing the broker config for minISR to the log manager. Fixed and added a test. Unable to change min.insync.replicas default Key: KAFKA-2114 URL: https://issues.apache.org/jira/browse/KAFKA-2114 Project: Kafka Issue Type: Bug Reporter: Bryan Baugher Assignee: Gwen Shapira Fix For: 0.8.2.1 Attachments: KAFKA-2114.patch Following the comment here[1] I was unable to change the min.insync.replicas default value. I tested this by setting up a 3 node cluster, wrote to a topic with a replication factor of 3, using request.required.acks=-1 and setting min.insync.replicas=2 on the broker's server.properties. I then shutdown 2 brokers but I was still able to write successfully. Only after running the alter topic command setting min.insync.replicas=2 on the topic did I see write failures. [1] - http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCANZ-JHF71yqKE6%2BKKhWe2EGUJv6R3bTpoJnYck3u1-M35sobgg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2137) New Kafka Producer not fully asynchronous
[ https://issues.apache.org/jira/browse/KAFKA-2137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-2137. -- Resolution: Duplicate This is the same issue as KAFKA-1835, which has a patch but might need additional review and follow up. New Kafka Producer not fully asynchronous - Key: KAFKA-2137 URL: https://issues.apache.org/jira/browse/KAFKA-2137 Project: Kafka Issue Type: Improvement Reporter: David Hay The new Producer client attempts to be fully asynchronous. However, it sill has the potential to block at the start of the {{send}} method when it asks for the metadata for the topic. ({{waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs)}}) There is a timeout (60 seconds, by default), but it would be nice if this lookup was performed in the background thread as well. This way producers could fire and forget without any potential to block the sending thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505238#comment-14505238 ] David Hay commented on KAFKA-1835: -- This solution doesn't seem ideal to me. It requires an update to {{pre.initialize.topics}} every time we add a new topic to our system. Otherwise, if I publish to a topic that is not in the list, then the behavior is the same as now...blocking until the metadata is returned the first time. Ideally, as I mentioned in KAFKA-2137, the metadata refresh would happen in a background thread. Perhaps a better solution would be have the entire body of the {{send(ProducerRecord, Callback)}} method running in a separate thread (or thread pool)? Alternately, is there a way to submit the request to the Sender without (yet) knowing what partition we want to send to? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)