Will it be possible to apply quotas based on a security principal?
Hi, I've been investigating using Kafka for a multi-user system that applies quotas at a per-user level. Reading through KIP-13 and KAFKA-1682, I wondered: are there any plans to link together the security principal and client identifier in some way? Currently it appears these are separate concepts - so I can't see any way to apply a quota based on the authenticated identity of a user. Regards - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Currently the authorizer does not perform any dns lookups and uses the hostname it receives as part of request.session as is. So in a way we are allowing only ip addresses. The only match is equality based so no ip ranges yet but that is easy to add. However, I think it is ok to allow for both ip addresses and hostnames and we should allow both. I am not sure why would I want to secure dns lookups and the host lookups extending to dns server are only necessary when the dns cache does not have the entry or the cache entry expires. This can be controlled by setting networkaddress.cache.ttl setting in jvm. Thanks Parth On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so we can make progress on this. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza t i o n + Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth From: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com Date: Thursday, March 5, 2015 at 10:33 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: [DISCUSS] KIP-11- Authorization design for kafka security Hi, KIP-11 is open for discussion , I have updated the wiki with the design and open questions. Thanks Parth
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496673#comment-14496673 ] Jay Kreps commented on KAFKA-2035: -- Fair point. If we are going to model topics though let's make sure we do a really good job of it as that is a a fairly fundamental change to the domain model. E.g. should we have a Topic object which has the set of associated Log instances as well as the topic-specific config...i.e. let's really try to make them a first class entity, maybe a TopicManager if that helps. I feel like adding a TopicConfigCache to KafkaApis is sort of an unhappy midpoint between not modeling topics and modeling them fully. Not sure if that makes any sense? Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2126) New consumer does not correctly configure deserializers
Ewen Cheslack-Postava created KAFKA-2126: Summary: New consumer does not correctly configure deserializers Key: KAFKA-2126 URL: https://issues.apache.org/jira/browse/KAFKA-2126 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Since the new ser/de interfaces use a configure() method with an extra isKey parameter, they need to manually configured after creation since getConfiguredInstances can't invoke configure() for us. The new consumer is missing this step when the deserializer is instantiated automatically by the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Will it be possible to apply quotas based on a security principal?
I think this should be a fairly minor follow-up item to have the quotas key off of user rather than client id. The advantage of starting with client.id is that it decouples the security work from the quota work in the short term and provides a mechanism for those using Kafka without authentication to still enforce quotas. On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com wrote: Hi, I've been investigating using Kafka for a multi-user system that applies quotas at a per-user level. Reading through KIP-13 and KAFKA-1682, I wondered: are there any plans to link together the security principal and client identifier in some way? Currently it appears these are separate concepts - so I can't see any way to apply a quota based on the authenticated identity of a user. Regards - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Guys, Thanks for the discussion! Summary: 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can affect implementation? A: We can fix this issue for the leading broker - ReplicaManager (or Partition) component should have accurate isr list, then with leading broker having correct info, to do a describe-topic we will need to define leading brokers for partitions and ask those for a correct isr list. Also, we should consider adding lag information to TMR for each follower for partition reassignment, as Jun suggested above. 2. Q: What if user adds different alter commands for the same topic in scope of one batch request? A: Because of the async nature of AlterTopicRequest it will be very hard then to assemble the expected (in terms of checking whether request is complete) result if we let users do this. Also it will be very confusing. It was proposed not to let users do this (probably add new Error for such cases). 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and ReassingPartitons in which order AlterTopic fields should be resolved? A: This item is not clear. There was a proposal to let user change only one thing at a time, e.g. specify either new Replicas, or ReplicaAssignment. This can be a simple solution, but it's a very strict rule. E.g. currently with TopicCommand user can increase nr of partitions and define replica assignment for newly added partitions. Taking into account item 2. this will be even harder for user to achieve this. 4. Q: Do we need such accurate errors returned from the server: InvalidArgumentPartitions, InvalidArgumentReplicas etc. A: I started implementation to add proposed error codes and now I think probably InvalidArgumentError should be sufficient. We can do simple validations on the client side (e.g. AdminClient can ensure nr of partitions argument is positive), others - which can be covered only on server (probably invalid topic config, replica assignment includes dead broker etc) - will be done on server, and in case of invalid argument we will return InvalidArgumentError without specifying the concrete field. It'd be great if we could cover these remaining issues, looks like they are minor, at least related to specific messages, not the overall protocol. - I think with that I can update confluence page and update patch to reflect all discussed items. This patch will probably include Wire protocol messages and server-side code to handle new requests. AdminClient and cli-tool implementation can be the next step. Thanks, Andrii Biletskyi On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote: Andrii, 500. I think what you suggested also sounds reasonable. Since ISR is only maintained accurately at the leader, TMR can return ISR if the broker is the leader of a partition. Otherwise, we can return an empty ISR. For partition reassignment, it would be useful to know the lag of each follower. Again, the leader knows this info. We can probably include that info in TMR as well. 300. I think it's probably reasonable to restrict AlterTopicRequest to change only one thing at a time, i.e., either partitions, replicas, replica assignment or config. Thanks, Jun On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, 404. Great, thanks! 500. If I understand correctly KAFKA-1367 says ISR part of TMR may be inconsistent. If so, then I believe all admin commands but describeTopic are not affected. Let me emphasize that it's about AdminClient operations, not about Wire Protocol requests. What I mean: To verify AdminClient.createTopic we will need (consistent) 'topics' set from TMR (we don't need isr) To verify alterTopic - again, probably 'topics' and 'assigned replicas' + configs To verify deleteTopic - only 'topics' To verify preferredReplica - 'leader', 'assigned replicas' To verify reassignPartitions - 'assigned replicas' ? (I'm not sure about this one) If everything above is correct, then AdminClient.describeTopic is the only command under risk. We can actually workaround it - find out the leader broker and ask TMR that leading broker to get up-to-date isr list. Bottom line: looks like 1367 is a separate issue, and is not a blocker for this KIP. I'm a bit concerned about adding new requests as a must-have part of this KIP when we don't know what we want to include to those requests. Also, I'd like to write down the new AlterTopicRequest semantics (if we decide to include replicas there and merge it with ReassignPartitionsRequest) 300. AlterTopicRequest = [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] The fields are resolved in this sequence: 1. Either partition or replicas
[jira] [Updated] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2056: - Resolution: Fixed Status: Resolved (was: Patch Available) PartitionAssignorTest.testRangePartitionAssignor transient failure -- Key: KAFKA-2056 URL: https://issues.apache.org/jira/browse/KAFKA-2056 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-2056.patch, KAFKA-2056.patch {code} unit.kafka.consumer.PartitionAssignorTest testRangePartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:318) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496624#comment-14496624 ] Guozhang Wang commented on KAFKA-2056: -- Thanks for the updated patch Fangmin. +1 and pushed to trunk. PartitionAssignorTest.testRangePartitionAssignor transient failure -- Key: KAFKA-2056 URL: https://issues.apache.org/jira/browse/KAFKA-2056 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-2056.patch, KAFKA-2056.patch {code} unit.kafka.consumer.PartitionAssignorTest testRangePartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:318) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2056: - Fix Version/s: 0.8.3 PartitionAssignorTest.testRangePartitionAssignor transient failure -- Key: KAFKA-2056 URL: https://issues.apache.org/jira/browse/KAFKA-2056 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-2056.patch, KAFKA-2056.patch {code} unit.kafka.consumer.PartitionAssignorTest testRangePartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:318) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496685#comment-14496685 ] Joel Koshy commented on KAFKA-2125: --- {noformat} 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {noformat} What is the output of {{grep -i shut}} _after_ the above line? Also, did you happen to take a thread-dump as well? After controlled shutdown succeeds, the broker should proceed to shutdown all its components - socket server (which it appears to have done), replica manager, controller, etc. However, that sequence seems to have been aborted for some reason. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so we can make progress on this. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza t i o n + Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth From: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com Date: Thursday, March 5, 2015 at 10:33 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: [DISCUSS] KIP-11-
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496865#comment-14496865 ] Joel Koshy commented on KAFKA-2125: --- For some reason, the socket server's processors shutdown got wedged which prevented the controller, replica-manager, etc. from shutting down. This fully explains the repetitive logs. A full threaddump would help understand why the socket server shutdown wedged - do you have the full dump? Also, do you have the full stack for the line that repeats 500 times in your attached grep output? Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496883#comment-14496883 ] Jason Rosenberg commented on KAFKA-2125: Joel, unfortunately I have no stack trace, and the tack traces I listed above are the full stack traces that I have..Going forward, we've removed the 3 minute automatic jvm shutdown, so next time, if it spins out of control, I can manually pull a thread dump. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496898#comment-14496898 ] Jason Rosenberg commented on KAFKA-2125: Ooh, it looks before the 500 repeated exceptions, the first one as a little different, which I neglected to include previously: {code} 2015-04-14 05:56:10,973 INFO [kafka-request-handler-6] log.Log - Truncating log mytopic-1 to offset 13860934. 2015-04-14 05:56:10,980 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:10,981 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:10,993 ERROR [kafka-network-thread-27330-2] utils.Utils$ - Uncaught exception in thread 'kafka-network-thread-12345-2': java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:374) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:180) at kafka.network.Processor.run(SocketServer.scala:364) at java.lang.Thread.run(Thread.java:745) {code} Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly
[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Rosenberg updated KAFKA-2125: --- Attachment: grep_shut.log Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests {code} This seems to repeat every 300 ms or so, and continues until the server is forcibly shutdown (for about 3 minutes in this case). The
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496810#comment-14496810 ] Parth Brahmbhatt commented on KAFKA-2035: - It does and if I understand correctly you are proposing to create something like the following class Topic(val config:TopicConfig, val logs:Set[Log] ., all other topic related stuff). I am not sure why this does not exist right now but I think moving in that direction is a good idea. At the same time I also think this will be a lot more work and we can probably take incremental approach without a lot of(or probably any) throw away work by starting to model one piece at a time so other jiras don't need to be blocked until we settle on a top level Topic model. After all the top level Topic will just be a composition of all these small things that make up a topic. If you think it is essential to nail the Topic structure before making progress on this jira and other related jira let me know and I can file a new jira for the same. Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Rosenberg updated KAFKA-2125: --- Attachment: (was: grep_shut.log) Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests {code} This seems to repeat every 300 ms or so, and continues until the server is forcibly shutdown (for about 3 minutes in this
[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Rosenberg updated KAFKA-2125: --- Attachment: grep_shut_edited.log Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests {code} This seems to repeat every 300 ms or so, and continues until the server is forcibly shutdown (for about 3 minutes in this case).
[jira] [Comment Edited] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496794#comment-14496794 ] Jason Rosenberg edited comment on KAFKA-2125 at 4/15/15 8:04 PM: - I uploaded an edited result of doing 'grep -i shut' on the log file after the controlled shutdown succeeded. (see 'grep_shut_edited.log'). I added a few comments, on lines preceded with '//' Also, I included an InterruptedException at the end, which seems to indicate where a thread was hung at the time the thread was shutdown by the jvm. was (Author: jbrosenb...@gmail.com): I uploaded an edited result of doing 'grep -i shut' on the log file after the controlled shutdown succeeded. (see 'grep_shut.log'). I added a few comments, on lines preceded with '//' Also, I included an InterruptedException at the end, which seems to indicate where a thread was hung at the time the thread was shutdown by the jvm. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345.
Review Request 33239: Patch for KAFKA-2126
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33239/ --- Review request for kafka. Bugs: KAFKA-2126 https://issues.apache.org/jira/browse/KAFKA-2126 Repository: kafka Description --- KAFKA-2126: Configure automatically instantiated deserializers in new consumer. Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 Diff: https://reviews.apache.org/r/33239/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-2126) New consumer does not correctly configure deserializers
[ https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2126: - Status: Patch Available (was: Open) New consumer does not correctly configure deserializers --- Key: KAFKA-2126 URL: https://issues.apache.org/jira/browse/KAFKA-2126 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2126.patch Since the new ser/de interfaces use a configure() method with an extra isKey parameter, they need to manually configured after creation since getConfiguredInstances can't invoke configure() for us. The new consumer is missing this step when the deserializer is instantiated automatically by the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2126) New consumer does not correctly configure deserializers
[ https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2126: - Attachment: KAFKA-2126.patch New consumer does not correctly configure deserializers --- Key: KAFKA-2126 URL: https://issues.apache.org/jira/browse/KAFKA-2126 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2126.patch Since the new ser/de interfaces use a configure() method with an extra isKey parameter, they need to manually configured after creation since getConfiguredInstances can't invoke configure() for us. The new consumer is missing this step when the deserializer is instantiated automatically by the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2126) New consumer does not correctly configure deserializers
[ https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496858#comment-14496858 ] Ewen Cheslack-Postava commented on KAFKA-2126: -- Created reviewboard https://reviews.apache.org/r/33239/diff/ against branch origin/trunk New consumer does not correctly configure deserializers --- Key: KAFKA-2126 URL: https://issues.apache.org/jira/browse/KAFKA-2126 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-2126.patch Since the new ser/de interfaces use a configure() method with an extra isKey parameter, they need to manually configured after creation since getConfiguredInstances can't invoke configure() for us. The new consumer is missing this step when the deserializer is instantiated automatically by the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496794#comment-14496794 ] Jason Rosenberg commented on KAFKA-2125: I uploaded an edited result of doing 'grep -i shut' on the log file after the controlled shutdown succeeded. (see 'grep_shut.log'). I added a few comments, on lines preceded with '//' Also, I included an InterruptedException at the end, which seems to indicate where a thread was hung at the time the thread was shutdown by the jvm. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Parth, If one wants to use his or her own access control including authentication system, with this design what will be needed to be done? Can one completely turn this off so that the system behaves exactly same as it is today? Thanks. Tong Sent from my iPhone On Apr 15, 2015, at 1:51 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so we can make progress on this. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza t i o n
Jenkins build is back to normal : KafkaPreCommit #75
See https://builds.apache.org/job/KafkaPreCommit/75/changes
Re: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected
[jira] [Assigned] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rekha Joshi reassigned KAFKA-2098: -- Assignee: Rekha Joshi Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121.patch error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Parth, I’m a little confused: why would Kafka need to interpret the JSON? IIRC KIP-11 even says that the TopicConfigData will just store the JSON. I’m not really making a design recommendation here, just trying to understand what you’re proposing. On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so we can make progress on this. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza t i o n + Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth From: Parth Brahmbhatt
Re: Will it be possible to apply quotas based on a security principal?
Jay, Thanks for the explanation - makes sense to me. I'd certainly be happy to work on a patch that implement this option. Regards - Adrian -Jay Kreps jay.kr...@gmail.com wrote: - To: dev@kafka.apache.org dev@kafka.apache.org From: Jay Kreps jay.kr...@gmail.com Date: 04/15/2015 07:42PM Subject: Re: Will it be possible to apply quotas based on a security principal? I think this should be a fairly minor follow-up item to have the quotas key off of user rather than client id. The advantage of starting with client.id is that it decouples the security work from the quota work in the short term and provides a mechanism for those using Kafka without authentication to still enforce quotas. On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com wrote: Hi, I've been investigating using Kafka for a multi-user system that applies quotas at a per-user level. Reading through KIP-13 and KAFKA-1682, I wondered: are there any plans to link together the security principal and client identifier in some way? Currently it appears these are separate concepts - so I can't see any way to apply a quota based on the authenticated identity of a user. Regards - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497159#comment-14497159 ] alexcb commented on KAFKA-2098: --- The comment about not committing binaries goes directly against the userguide of gradle: from http://gradle.org/docs/current/userguide/gradle_wrapper.html {quote} All of these files should be submitted to your version control system. This only needs to be done once. After these files have been added to the project, the project should then be built with the added gradlew command. The gradlew command can be used exactly the same way as the gradle command. {quote} The kafka README states that one must first run gradle, then run ./gradlew jar. The gradle command generates the gradlew script. If we don't want to add binaries to the source revision (a noble cause), then perhaps one should also avoid adding generated files such as the gradlew file. I personally lost an hour on this issue already. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar
[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497057#comment-14497057 ] Steven Zhen Wu commented on KAFKA-2121: --- finally, I was able to post to review board. had some issues with my python installation. finally used virtualenv to make it work. I haven't yet addressed [~ewencp] comments yet. will update the reviewboard. error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to
[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497051#comment-14497051 ] Steven Zhen Wu commented on KAFKA-2121: --- Created reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk error handling in KafkaProducer constructor --- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- --
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not expose too many configurations to users. What I am thinking now to handle (2) is when user call send, if we know that a partition is offline, we should throw exception immediately instead of putting it into accumulator. This would protect further memory consumption. We might also want to fail all the batches in the dequeue once we found a partition is offline. That said, I feel timeout might not be quite applicable to (2). Do you have any suggestion on this? Right, I didn't actually mean to solve 2 here, but was trying to figure out if a solution to 2 would reduce what we needed to do to address 3. (And depending on how they are implemented, fixing 1 might also address 2). It sounds like you hit hang that I wasn't really expecting. This probably just means the KIP motivation needs to be a bit clearer about what type of situation this
[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata
[ https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tim Brooks updated KAFKA-2102: -- Attachment: KAFKA-2102_2015-04-15_19:55:45.patch Remove unnecessary synchronization when managing metadata - Key: KAFKA-2102 URL: https://issues.apache.org/jira/browse/KAFKA-2102 Project: Kafka Issue Type: Improvement Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt Usage of the org.apache.kafka.clients.Metadata class is synchronized. It seems like the current functionality could be maintained without synchronizing the whole class. I have been working on improving this by moving to finer grained locks and using atomic operations. My initial benchmarking of the producer is that this will improve latency (using HDRHistogram) on submitting messages. I have produced an initial patch. I do not necessarily believe this is complete. And I want to definitely produce some more benchmarks. However, I wanted to get early feedback because this change could be deceptively tricky. I am interested in knowing if this is: 1. Something that is of interest to the maintainers/community. 2. Along the right track 3. If there are any gotchas that make my current approach naive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata
[ https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tim Brooks updated KAFKA-2102: -- Status: Patch Available (was: In Progress) Remove unnecessary synchronization when managing metadata - Key: KAFKA-2102 URL: https://issues.apache.org/jira/browse/KAFKA-2102 Project: Kafka Issue Type: Improvement Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt Usage of the org.apache.kafka.clients.Metadata class is synchronized. It seems like the current functionality could be maintained without synchronizing the whole class. I have been working on improving this by moving to finer grained locks and using atomic operations. My initial benchmarking of the producer is that this will improve latency (using HDRHistogram) on submitting messages. I have produced an initial patch. I do not necessarily believe this is complete. And I want to definitely produce some more benchmarks. However, I wanted to get early feedback because this change could be deceptively tricky. I am interested in knowing if this is: 1. Something that is of interest to the maintainers/community. 2. Along the right track 3. If there are any gotchas that make my current approach naive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2102) Remove unnecessary synchronization when managing metadata
[ https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497396#comment-14497396 ] Tim Brooks commented on KAFKA-2102: --- Updated reviewboard https://reviews.apache.org/r/32937/diff/ against branch origin/trunk Remove unnecessary synchronization when managing metadata - Key: KAFKA-2102 URL: https://issues.apache.org/jira/browse/KAFKA-2102 Project: Kafka Issue Type: Improvement Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt Usage of the org.apache.kafka.clients.Metadata class is synchronized. It seems like the current functionality could be maintained without synchronizing the whole class. I have been working on improving this by moving to finer grained locks and using atomic operations. My initial benchmarking of the producer is that this will improve latency (using HDRHistogram) on submitting messages. I have produced an initial patch. I do not necessarily believe this is complete. And I want to definitely produce some more benchmarks. However, I wanted to get early feedback because this change could be deceptively tricky. I am interested in knowing if this is: 1. Something that is of interest to the maintainers/community. 2. Along the right track 3. If there are any gotchas that make my current approach naive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32937: Patch for KAFKA-2102
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32937/ --- (Updated April 16, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2102 https://issues.apache.org/jira/browse/KAFKA-2102 Repository: kafka Description (updated) --- Method does not need to be synchronized Do not synchronize contains topic method Continue removing the need to synchronize the metadata object Store both last refresh and need to refresh in same variable Fix synchronize issue Version needs to be volatile rework how signally happens remove unnecessary creation of new set initialize 0 at the field level Fix the build Start moving synchronization of metadata to different class Start moving synchronization work to new class Remove unused code Functionality works. Not threadsafe move version into metadata synchronizer Make version volatile Rename classes move to finergrained locking Use locks in bookkeeper Only use atomic variabled use successful metadata in metrics Change these things back to trunk Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/Metadata.java 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a Diff: https://reviews.apache.org/r/32937/diff/ Testing --- Thanks, Tim Brooks
[jira] [Updated] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Rosenberg updated KAFKA-2127: --- Description: I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0 2015-04-16 03:51:26,454 INFO [kafka-request-handler-3] log.LogManager - Created log for partition [mytopic,2] in /data/kafka_logs with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 8640, index.interval.bytes - 4096, retention.bytes - 500, min.insync.replicas - 1, cleanup.policy - delete, unclean.leader.election.enable - true, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms - 0}. 2015-04-16 03:51:26,454 WARN [kafka-request-handler-3] cluster.Partition - Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for partition [mytopic,2] 2015-04-16 03:51:26,558 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.4.34. 2015-04-16 03:51:26,658 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket
[jira] [Comment Edited] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570 ] Jason Rosenberg edited comment on KAFKA-2127 at 4/16/15 5:19 AM: - the ip's that show in the 'Closing socket' and 'Connection reset' log lines appear to be from hosts which have kafka consumers running, so maybe that's the explanation. Consumers consuming the topic had their connections closed by the broker (except in some cases the consumer app gets a rebalance event first, which causes the connection to be closed remotely first)? I'm wondering if we can get rid of the ERROR logging for the connection resets however. Since it is essentially expected behavior when adding/deleting partitions, that connections will be closed, it seems we don't need to log the stack trace for the connection reset and log it as ERROR. was (Author: jbrosenb...@gmail.com): the ip's that show in the 'Closing socket' and 'Connection reset' log lines appear to be random hosts in our deployment, hosts which produce messages to kafka, but are otherwise not part of the kafka cluster or zookeeper, etc. Running TopicCommand --alter causes connection close/reset errors in kafka logs --- Key: KAFKA-2127 URL: https://issues.apache.org/jira/browse/KAFKA-2127 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Priority: Minor I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at
[jira] [Created] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
Jason Rosenberg created KAFKA-2127: -- Summary: Running TopicCommand --alter causes connection close/reset errors in kafka logs Key: KAFKA-2127 URL: https://issues.apache.org/jira/browse/KAFKA-2127 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Priority: Minor I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0 2015-04-16 03:51:26,454 INFO [kafka-request-handler-3] log.LogManager - Created log for partition [mytopic,2] in /data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 8640, index.interval.bytes - 4096, retention.bytes - 500, min.insync.replicas - 1, cleanup.policy - delete, unclean.leader.election.enable - true, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms - 0}. 2015-04-16 03:51:26,454 WARN [kafka-request-handler-3] cluster.Partition - Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for partition [mytopic,2] 2015-04-16 03:51:26,558 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to
[jira] [Commented] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497576#comment-14497576 ] Jason Rosenberg commented on KAFKA-2127: Also, unrelated to the socket errors, there's this line: {code} 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] {code} Since the new partition 'myopic,2' was just created, I'm not sure why we have this log line (as the fetcher should not have even been created yet for that partition). Running TopicCommand --alter causes connection close/reset errors in kafka logs --- Key: KAFKA-2127 URL: https://issues.apache.org/jira/browse/KAFKA-2127 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Priority: Minor I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0 2015-04-16 03:51:26,454 INFO [kafka-request-handler-3] log.LogManager - Created log for partition [mytopic,2] in
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497350#comment-14497350 ] Rekha Joshi commented on KAFKA-2098: Hi.I agree [~alexcb] people all over the world are losing time on it :), though I also understand where [~gwenshap] is coming from. For users point of view better to have it in, especially as many standard open source based on gradle have them, one that comes to mind immediately is Samza - https://github.com/apache/samza/tree/master/gradle/wrapper Anyhow I will let Kafka committers and [~jkreps] decide whats works best. Thanks Rekha Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi Assignee: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2102) Remove unnecessary synchronization when managing metadata
[ https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497392#comment-14497392 ] Tim Brooks commented on KAFKA-2102: --- I added an updated patch. This patch includes a few things: 1. I moved to using a finer locking strategy opposed to attempting to use all atomic instructions. None of the methods are synchronized. 2. I delegated the synchronization code and data about when the last update was, etc to a new MetadataBookkeeper. When I was first reading the old code I had some issues parsing the mixture of cluster state, topic state, state about when to do the next update, and state about when the last update had been completed. Maybe my changes make this easier to parse. Maybe they don't. 3. I moved lastNoNodeAvailableMs in the NetworkClient state into the MetadataBookkeeper. Since this variable was essentially a failed attempt to update metadata, and it was not accessed in any different way for distinct metrics, it seemed like it would be nicer to keep state about when the next metadata update should happen together. 4. No one has responded to KAFKA-2101. But it was highly relevant to what I was working on, so it is affected by this patch. I created a distinction between successful metadata update and a metadata update attempt. The metadata-age metric only uses the last successful update in its reports. This seemed like the correct approach based on the name of that metric. Since a failed update does not make the metadata any younger. The performance improvements are primarily in the 90+ percentile. I ran a producer test with both five and eight threads pushing 10,000 messages to kafka. And I repeated it ten times. I recorded the time with HDRHistogram. The improvements were somewhere between 4-30% reduced latency in the 90+%. For example at the 0.99062500 percentile on the five thread test the latency was reduced from 14.223 microseconds to 9.775 (31%). At the 0.9000 percentile the latency was reduced from 2.947 to 2.837 (3.9%) So certainly not a lot. But pretty consistently across the higher percentiles, the latency is improved. In the five thread test the mean decreased 4.8%. In the eight thread test the mean decreased 7.8%. The code for the latency test can be found here: https://github.com/tbrooks8/kafka-latency-test Remove unnecessary synchronization when managing metadata - Key: KAFKA-2102 URL: https://issues.apache.org/jira/browse/KAFKA-2102 Project: Kafka Issue Type: Improvement Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch Usage of the org.apache.kafka.clients.Metadata class is synchronized. It seems like the current functionality could be maintained without synchronizing the whole class. I have been working on improving this by moving to finer grained locks and using atomic operations. My initial benchmarking of the producer is that this will improve latency (using HDRHistogram) on submitting messages. I have produced an initial patch. I do not necessarily believe this is complete. And I want to definitely produce some more benchmarks. However, I wanted to get early feedback because this change could be deceptively tricky. I am interested in knowing if this is: 1. Something that is of interest to the maintainers/community. 2. Along the right track 3. If there are any gotchas that make my current approach naive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata
[ https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tim Brooks updated KAFKA-2102: -- Attachment: five-threads-trunk.txt five-threads-patch.txt eight-threads-trunk.txt eight-threads-patch.txt HDRHistogram results Remove unnecessary synchronization when managing metadata - Key: KAFKA-2102 URL: https://issues.apache.org/jira/browse/KAFKA-2102 Project: Kafka Issue Type: Improvement Reporter: Tim Brooks Assignee: Tim Brooks Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, eight-threads-patch.txt, eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt Usage of the org.apache.kafka.clients.Metadata class is synchronized. It seems like the current functionality could be maintained without synchronizing the whole class. I have been working on improving this by moving to finer grained locks and using atomic operations. My initial benchmarking of the producer is that this will improve latency (using HDRHistogram) on submitting messages. I have produced an initial patch. I do not necessarily believe this is complete. And I want to definitely produce some more benchmarks. However, I wanted to get early feedback because this change could be deceptively tricky. I am interested in knowing if this is: 1. Something that is of interest to the maintainers/community. 2. Along the right track 3. If there are any gotchas that make my current approach naive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Rosenberg updated KAFKA-2127: --- Description: I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0 2015-04-16 03:51:26,454 INFO [kafka-request-handler-3] log.LogManager - Created log for partition [mytopic,2] in /data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 8640, index.interval.bytes - 4096, retention.bytes - 500, min.insync.replicas - 1, cleanup.policy - delete, unclean.leader.election.enable - true, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms - 0}. 2015-04-16 03:51:26,454 WARN [kafka-request-handler-3] cluster.Partition - Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for partition [mytopic,2] 2015-04-16 03:51:26,558 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.4.34. 2015-04-16 03:51:26,658 INFO [kafka-network-thread-27330-0]
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Kafka currently stores logConfig overrides specified during topic creation in zookeeper, its just an instance of java.util.Properties converted to json. I am proposing in addition to that we store acls and owner as well as part of same Properties map. There is some infrastructure around reading this config, converting it back to Properties map and most importantly propagating any changes efficiently which we will be able to leverage. As this infrastructure is common to the cluster the reading (not interpreting) of config happens outside of any authorization code. If the TopicConfigCache just kept the json representation and left it to authorizer to parse it, the authorizer will have to either parse the json for each request(not acceptable) or it will have to keep one more layer of parsed ACL instance cache. Assuming authorizer will keep an additional caching layer we will now have to implement some way to invalidate the cache which means the TopicConfigCache will have to be an observable which the Authorizer observes and invalidates its cache entries when topicConfigCache gets updated. Seemed like unnecessary complexity with not lot to gain so I went with TopicConfigCache interpreting the json and caching a higher level modeled object. In summary, the interpretation is done for both optimization and simplicity. If you think it is important to allow custom ACL format support we can add one more pluggable config(acl.parser) and interface(AclParser) or it could just be another method in Authorizer. One thing to note the current ACL json is versioned so it is easy to make changes to it however it won’t be possible to support custom ACL formats with the current design. Thanks Parth On 4/15/15, 4:29 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, I’m a little confused: why would Kafka need to interpret the JSON? IIRC KIP-11 even says that the TopicConfigData will just store the JSON. I’m not really making a design recommendation here, just trying to understand what you’re proposing. On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so
Re: Review Request 32937: Patch for KAFKA-2102
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32937/#review80298 --- This version looks a lot better than the original patch -- it's a lot clearer what's going on and some of the error-prone tricks have been removed. I also like the separation of the management of the Cluster object from the request/versioning stuff as long as we can convince ourselves that it's safe to do so, I also found the combination confusing to reason about. I filed a couple of issues. I'd probably also want to double check the last update/timeout code -- it was very difficult to get that working properly under all conditions, so I'd want to be certain we didn't lose anything in moving that around and changing how successes/failures are handled. clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java https://reviews.apache.org/r/32937/#comment130108 This patch will definitely need a comment somewhere explaining the locking strategy and the reasoning behind it. It's won't be obvious even to someone familiar with other client code why this all works the way it does. clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java https://reviews.apache.org/r/32937/#comment130115 Is there any benefit to using a Lock + Condition instead of the monitor for the object? Seems like it would make the code a bit simpler - you'd get rid of all the try/finally blocks. clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java https://reviews.apache.org/r/32937/#comment130111 Are the lazySets accomplishing anything performance-wise? I doubt most people even know the method exists, let alone the implications. Some seem like they're probably ok, e.g. none of the code that uses the version number relies on seeing the updated value immediately. But others I'm less sure about (and the lack of documentation on lazySet makes it hard to be certain). For example, using updateRequested.lazySet() might work, but could give incorrect results if the old value is returned to a different thread. I think this only affects the network thread in the producer, but the consumer can potentially be called from different threads. Are we sure the lazySet works as expected in that case? - Ewen Cheslack-Postava On April 16, 2015, 12:56 a.m., Tim Brooks wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32937/ --- (Updated April 16, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2102 https://issues.apache.org/jira/browse/KAFKA-2102 Repository: kafka Description --- Method does not need to be synchronized Do not synchronize contains topic method Continue removing the need to synchronize the metadata object Store both last refresh and need to refresh in same variable Fix synchronize issue Version needs to be volatile rework how signally happens remove unnecessary creation of new set initialize 0 at the field level Fix the build Start moving synchronization of metadata to different class Start moving synchronization work to new class Remove unused code Functionality works. Not threadsafe move version into metadata synchronizer Make version volatile Rename classes move to finergrained locking Use locks in bookkeeper Only use atomic variabled use successful metadata in metrics Change these things back to trunk Diffs - clients/src/main/java/org/apache/kafka/clients/Metadata.java 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a Diff: https://reviews.apache.org/r/32937/diff/ Testing --- Thanks, Tim Brooks
[jira] [Comment Edited] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570 ] Jason Rosenberg edited comment on KAFKA-2127 at 4/16/15 5:13 AM: - the ip's that show in the 'Closing socket' and 'Connection reset' log lines appear to be random hosts in our deployment, hosts which produce messages to kafka, but are otherwise not part of the kafka cluster or zookeeper, etc. was (Author: jbrosenb...@gmail.com): the ip's that show in the 'Closing socket' and 'Connection reset' log lines appear to be random hosts in our deployment, hosts which produce messages to kafka, but are otherwise not part of the kafka deployment. Running TopicCommand --alter causes connection close/reset errors in kafka logs --- Key: KAFKA-2127 URL: https://issues.apache.org/jira/browse/KAFKA-2127 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Priority: Minor I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0
[jira] [Commented] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs
[ https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570 ] Jason Rosenberg commented on KAFKA-2127: the ip's that show in the 'Closing socket' and 'Connection reset' log lines appear to be random hosts in our deployment, hosts which produce messages to kafka, but are otherwise not part of the kafka deployment. Running TopicCommand --alter causes connection close/reset errors in kafka logs --- Key: KAFKA-2127 URL: https://issues.apache.org/jira/browse/KAFKA-2127 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Priority: Minor I am using 0.8.2.1. I've been noticing that any time I use TopicCommand to alter a topic (e.g. add partitions) or delete a topic, the broker logs show a bunch of closed connections, and usually 2 or 3 Connection reset exceptions. It logs these with ERROR status. I recently used the kafka.admin.TopicCommand tool to increase the partitions for a topic from 1 to 4. So I ran: {code} java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 --topic mytopic --alter --partitions 4 {code} This resulted in the following sequence in the broker log (repeated pretty much in the logs of each broker): {code} 2015-04-16 03:51:26,156 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /1.2.3.12. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.89. 2015-04-16 03:51:26,169 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /1.2.3.95. 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.4.34 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.4.59 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor - Closing socket for /1.2.3.11 because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) 2015-04-16 03:51:26,451 INFO [kafka-request-handler-3] server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed fetcher for partitions [mytopic,2] 2015-04-16 03:51:26,453 INFO [kafka-request-handler-3] log.Log - Completed load of log mytopic-2 with log end offset 0 2015-04-16 03:51:26,454 INFO [kafka-request-handler-3] log.LogManager - Created log for partition [mytopic,2] in /data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807,
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496951#comment-14496951 ] Joel Koshy commented on KAFKA-2125: --- Ah yes that would explain it. [~sriharsha] we were pretty sure we had fixed it but apparently there still seems to be a corner case. (KAFKA-1577) Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending
Re: [DISCUSS] KIP-11- Authorization design for kafka security
I have added the following to list of open questions based on the hangout discussion: * The owner field of a topic in current proposal is set to the user who created the topic and this user has all access to the topic. There was suggestion on making this a list of users who can share ownership. alternatively we can keep the user as a single entity but the user creating the topic will have to ensure that the topic acls are configured to allow admin access to all the other users that wants to assume co-ownership. It will be great if we can at least agree on the following things: * The newly proposed broker configs, their types and names * The Authorizer interface and the Acl structure * The command line options being added, their name and types * The new structure of topic config which is being stored in zookeeper Thanks Parth On 4/15/15, 12:53 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Yes, it can be turned off completely. We are proposing to add authorizer.class.name as a broker config. The value of this config can be null/unspecified (which is the default) in which case no authorization will be performed. It can be set to any FQCN of any class that implements Authorizer so you can plugin custom authorizers. Authentication is a different beast and you should look at other security related KIPs, here is the top level jira https://issues.apache.org/jira/browse/KAFKA-1682 and top level document https://cwiki.apache.org/confluence/display/KAFKA/Security Thanks Parth On 4/15/15, 11:56 AM, Tong Li liton...@us.ibm.com wrote: Parth, If one wants to use his or her own access control including authentication system, with this design what will be needed to be done? Can one completely turn this off so that the system behaves exactly same as it is today? Thanks. Tong Sent from my iPhone On Apr 15, 2015, at 1:51 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code for authorization ready. I am now modifying the command line utilities. I would really appreciate if some of the committers can spend sometime to review the KIP so we can make progress on this. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497015#comment-14497015 ] Sriharsha Chintalapani commented on KAFKA-2125: --- [~jjkoshy] [~jbrosenberg] Thanks for the logs. Can you bit more details on the cluster like how many topics and partitions you've . I won't be able to work on this for another couple of days but will try get back to you asap with more details. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread -
[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds
[ https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497035#comment-14497035 ] Jason Rosenberg commented on KAFKA-2125: This cluster has 4 nodes. each node has 4 disks, mounted as JBOD. about 3.5Tb total per node. Almost all partitions have 2 replicas. There are 560 topics, 16 of which have more than 1 partition. The largest topic has 40 partitions. Total of 854 partitions. Log retention is 24 hours in most cases, disk about 40% full. Infinite loop after controlled shutdown succeeds Key: KAFKA-2125 URL: https://issues.apache.org/jira/browse/KAFKA-2125 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Jason Rosenberg Priority: Blocker Attachments: grep_shut_edited.log I have a 4 node cluster, running 0.8.2.1, that got into a bad state last night during a rolling restart. The first node to be restarted was the controller. Controlled Shutdown completed successfully, after about 800ms. But after that, the server failed to shutdown, and got into what appears to be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', and the 'kafka-scheduler-0' thread. Ultimately, the shutdown timed out (after 3 minutes) and it was terminated by the deployment system, and the server was restarted. As expected, when it came back up it took some time re-syncing it's partitions, but eventually came back and all was well. However, I think there was something fundamentally wrong with the shutdown process. The controller didn't seem to give up controller status, for one thing, as part of the controlled shutdown (which I should think would be the first thing it should do?). Anyway, below are some log snippets. I do have full logs from each broker in the cluster, which I can provide (but would have to significantly anonymize the logs before forwarding along). Controlled shutdown starts and succeeds: {code} 2015-04-14 05:56:10,134 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], shutting down 2015-04-14 05:56:10,136 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Starting controlled shutdown 2015-04-14 05:56:10,150 INFO [kafka-request-handler-0] controller.KafkaController - [Controller 45]: Shutting down broker 45 ... ... 2015-04-14 05:56:10,951 INFO [Thread-38] server.KafkaServer - [Kafka Server 45], Controlled shutdown succeeded {code} Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly spamming the logs, like so: {code} 2015-04-14 05:56:11,281 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,281 INFO [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 connected to id:45,host:broker45.square,port:12345 for sending state change requests 2015-04-14 05:56:11,582 WARN [Controller-45-to-broker-45-send-thread] controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], Controller 45 epoch 21 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1) - (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48) to broker id:45,host:broker45.square,port:12345. Reconnecting to broker. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 2015-04-14 05:56:11,582 INFO
Re: Will it be possible to apply quotas based on a security principal?
If it is all possible I think we can introduce request pipeline architecture for producer, broker and consumer (does not have to be done all at once), that way, security, quota, encryption, serialization, compression all can be done as plugable components. One can freely stack them up or tear them apart at the configuration level. Each component can have its own design and implementation. Any deployed can choose what interest him or her. Components have no dependency among each other. New features can be easily added/tried/removed/abandoned. When I looked at our code, instrument this so called pipeline can be very easy. Wonder if any of you guys interested. Thanks. Sent from my iPhone On Apr 15, 2015, at 4:14 PM, Jay Kreps jay.kr...@gmail.com wrote: I think this should be a fairly minor follow-up item to have the quotas key off of user rather than client id. The advantage of starting with client.id is that it decouples the security work from the quota work in the short term and provides a mechanism for those using Kafka without authentication to still enforce quotas. On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com wrote: Hi, I've been investigating using Kafka for a multi-user system that applies quotas at a per-user level. Reading through KIP-13 and KAFKA-1682, I wondered: are there any plans to link together the security principal and client identifier in some way? Currently it appears these are separate concepts - so I can't see any way to apply a quota based on the authenticated identity of a user. Regards - Adrian Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
[jira] [Updated] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments
[ https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-2118: -- Attachment: KAFKA-2118_2015-04-15_09:43:51.patch Cleaner cannot clean after shutdown during replaceSegments -- Key: KAFKA-2118 URL: https://issues.apache.org/jira/browse/KAFKA-2118 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is: 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0. 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15. and called replaceSegments. 3) 0.cleaned was renamed to 0.swap. 4) Broker shut down before deleting segments 1094621529 and 1094831997. 5) Broker started up and logged Found log file /mnt/persistent/kafka-logs/xxx-15/.log.swap from interrupted swap operation, repairing. 6) Cleaner thread died with the exception kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index.cleaned. I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529). This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen). Cleaner logs post-startup: 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15. 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15... 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157). 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete. 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)... 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes. 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index. cleaned. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.append(LogSegment.scala:81) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354) at
[jira] [Commented] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments
[ https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495952#comment-14495952 ] Rajini Sivaram commented on KAFKA-2118: --- Have updated the patch with an unit test to check recovery process after broker crash at different stages of a clean and swap operation. Test fails with current Kafka code and works with the updated code in the patch. Have also done manual testing by recreating the scenario described in this report by forcing termination during replaceSegments(). Have tested that the failure no longer occurs with the attached patch with the same manual test. Cleaner cannot clean after shutdown during replaceSegments -- Key: KAFKA-2118 URL: https://issues.apache.org/jira/browse/KAFKA-2118 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is: 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0. 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15. and called replaceSegments. 3) 0.cleaned was renamed to 0.swap. 4) Broker shut down before deleting segments 1094621529 and 1094831997. 5) Broker started up and logged Found log file /mnt/persistent/kafka-logs/xxx-15/.log.swap from interrupted swap operation, repairing. 6) Cleaner thread died with the exception kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index.cleaned. I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529). This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen). Cleaner logs post-startup: 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15. 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15... 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157). 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete. 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)... 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes. 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index. cleaned. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at
Re: Review Request 33168: Fix recovery of swap files after broker crash
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33168/ --- (Updated April 15, 2015, 9:44 a.m.) Review request for kafka. Bugs: KAFKA-2118 https://issues.apache.org/jira/browse/KAFKA-2118 Repository: kafka Description --- Patch for KAFKA-2118: Fix recovery of cleaned segments after broker crash Diffs (updated) - core/src/main/scala/kafka/log/Log.scala 5563f2de8113a0ece8929bec9c75dbf892abbb66 core/src/test/scala/unit/kafka/log/CleanerTest.scala 9792ed689033dbd4ad99809a4e566136d2b9fadf Diff: https://reviews.apache.org/r/33168/diff/ Testing --- Thanks, Rajini Sivaram
[jira] [Commented] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments
[ https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495943#comment-14495943 ] Rajini Sivaram commented on KAFKA-2118: --- Updated reviewboard https://reviews.apache.org/r/33168/diff/ against branch origin/trunk Cleaner cannot clean after shutdown during replaceSegments -- Key: KAFKA-2118 URL: https://issues.apache.org/jira/browse/KAFKA-2118 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is: 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0. 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15. and called replaceSegments. 3) 0.cleaned was renamed to 0.swap. 4) Broker shut down before deleting segments 1094621529 and 1094831997. 5) Broker started up and logged Found log file /mnt/persistent/kafka-logs/xxx-15/.log.swap from interrupted swap operation, repairing. 6) Cleaner thread died with the exception kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index.cleaned. I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529). This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen). Cleaner logs post-startup: 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15. 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15... 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157). 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete. 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)... 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes. 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index. cleaned. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.append(LogSegment.scala:81) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354) at