Will it be possible to apply quotas based on a security principal?

2015-04-15 Thread Adrian Preston
Hi,

I've been investigating using Kafka for a multi-user system that applies quotas 
at a per-user level.  Reading through KIP-13 and KAFKA-1682, I wondered: are 
there any plans to link together the security principal and client identifier 
in some way?  Currently it appears these are separate concepts - so I can't see 
any way to apply a quota based on the authenticated identity of a user.


Regards
- Adrian

Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Parth Brahmbhatt
Currently the authorizer does not perform any dns lookups and uses the
hostname it receives as part of request.session as is. So in a way we are
allowing only ip addresses. The only match is equality based so no ip
ranges yet but that is easy to add.

However, I think it is ok to allow for both ip addresses and hostnames and
we should allow both. I am not sure why would I want to secure dns lookups
and the host lookups extending to dns server are only necessary when the
dns cache does not have the entry or the cache entry expires. This can be
controlled by setting networkaddress.cache.ttl setting in jvm.


Thanks
Parth

On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.org wrote:

I also feel, having just IP would be more appropriate. Host lookup will
unnecessary slow things down and would be insecure as you pointed out.

With IP, it will be also able to setup policies (in future if needed) with
ranges or netmasks and it would be more scalable.

Bosco


On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Sorry to chime in so late, but I’ve got a minor question on the KIP.

Several methods take a parameter named “host” of type String. Is that
intended to be a hostname, or an IP address? If the former, I’m curious
as
to how that’s found (in my experience, when accepting an incoming socket
connection, you only know the IP address, and there isn’t a way to map
that to a hostname without a round trip to a DNS server, which is
insecure
anyway).


On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Hi all,

I have modified the KIP to reflect the recent change request from the
reviewers. I have been working on the code and I have the server side
code
for authorization ready. I am now modifying the command line utilities.
I
would really appreciate if some of the committers can spend sometime to
review the KIP so we can make progress on this.

Thanks
Parth

On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Thanks! A few questions:

1. Do you want to permit rules in your ACLs that DENY access as well as
ALLOW? This can be handy setting up rules that have exceptions. E.g.
“Allow principal P to READ resource R from all hosts” with “Deny
principal
P READ access to resource R from host H1” in combination would allow P
to
READ R from all hosts *except* H1.

2. When a topic is newly created, will there be an ACL created for it?
If
not, would that not deny subsequent access to it?

(nit) Maybe use Principal instead of String to represent principals?


On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote:

Parth

Overall it is looking good. Couple of questionsŠ

- Can you give an example how the policies will look like in the
default
implementation?
- In the operations, can we support ³CONNECT² also? This can be used
during Session connection
- Regarding access control for ³Topic Creation², since we can¹t do it
on
the server side, can we de-scope it for? And plan it as a future
feature
request?

Thanks

Bosco

 

On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote:

Hi Parth,
Thanks for putting this together. Overall it looks good
to
me. Although AdminUtils is a concern KIP-4  can probably
fix
that part.
Thanks,
Harsha

On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
 Forgot to add links to wiki and jira.
 
 Link to wiki:
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza
t
i
o
n
+
Interface
 Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
 
 Thanks
 Parth
 
 From: Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
 Date: Thursday, March 5, 2015 at 10:33 AM
 To: dev@kafka.apache.orgmailto:dev@kafka.apache.org
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 Subject: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Hi,
 
 KIP-11 is open for discussion , I have updated the wiki with the
design
 and open questions.
 
 Thanks
 Parth










[jira] [Commented] (KAFKA-2035) Add a topic config cache.

2015-04-15 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496673#comment-14496673
 ] 

Jay Kreps commented on KAFKA-2035:
--

Fair point. If we are going to model topics though let's make sure we do a 
really good job of it as that is a a fairly fundamental change to the domain 
model. E.g. should we have a Topic object which has the set of associated Log 
instances as well as the topic-specific config...i.e. let's really try to make 
them a first class entity, maybe a TopicManager if that helps. I feel like 
adding a TopicConfigCache to KafkaApis is sort of an unhappy midpoint between 
not modeling topics and modeling them fully. Not sure if that makes any sense?

 Add a topic config cache.
 -

 Key: KAFKA-2035
 URL: https://issues.apache.org/jira/browse/KAFKA-2035
 Project: Kafka
  Issue Type: Task
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2035_2015-03-31_10:52:12.patch


 Currently the topic config is all about Log configuration so we have a 
 TopicConfigManager which takes in a Log instance and keeps updating that 
 instance's config instance as and when the topic config is updated. The topic 
 config update notifications are sent using zk watchers by Controller.
 I propose to introduce a TopicConfigCache which will be updated by 
 TopicConfigManager on any config changes. The log instance and any other 
 component (like the authorizer mentioned in KAFKA-1688) will have a reference 
 to TopicConfigCache using which they will access the topic configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2126) New consumer does not correctly configure deserializers

2015-04-15 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-2126:


 Summary: New consumer does not correctly configure deserializers
 Key: KAFKA-2126
 URL: https://issues.apache.org/jira/browse/KAFKA-2126
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3


Since the new ser/de interfaces use a configure() method with an extra isKey 
parameter, they need to manually configured after creation since 
getConfiguredInstances can't invoke configure() for us. The new consumer is 
missing this step when the deserializer is instantiated automatically by the 
consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Will it be possible to apply quotas based on a security principal?

2015-04-15 Thread Jay Kreps
I think this should be a fairly minor follow-up item to have the quotas key
off of user rather than client id. The advantage of starting with client.id
is that it decouples the security work from the quota work in the short
term and provides a mechanism for those using Kafka without authentication
to still enforce quotas.

On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com wrote:

 Hi,

 I've been investigating using Kafka for a multi-user system that applies
 quotas at a per-user level.  Reading through KIP-13 and KAFKA-1682, I
 wondered: are there any plans to link together the security principal and
 client identifier in some way?  Currently it appears these are separate
 concepts - so I can't see any way to apply a quota based on the
 authenticated identity of a user.


 Regards
 - Adrian

 Unless stated otherwise above:
 IBM United Kingdom Limited - Registered in England and Wales with number
 741598.
 Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU




Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-15 Thread Andrii Biletskyi
Guys,

Thanks for the discussion!

Summary:

1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can
affect implementation?
A: We can fix this issue for the leading broker - ReplicaManager (or
Partition)
component should have accurate isr list, then with leading broker
having correct
info, to do a describe-topic we will need to define leading brokers
for partitions
and ask those for a correct isr list.
Also, we should consider adding lag information to TMR for each
follower for
partition reassignment, as Jun suggested above.

2. Q: What if user adds different alter commands for the same topic in scope
 of one batch request?
A: Because of the async nature of AlterTopicRequest it will be very
hard then
to assemble the expected (in terms of checking whether request is
complete)
result if we let users do this. Also it will be very confusing. It
was proposed not to
let users do this (probably add new Error for such cases).

3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and
ReassingPartitons in which order AlterTopic fields should be
resolved?
A: This item is not clear. There was a proposal to let user change only
one thing at a time, e.g. specify either new Replicas, or
ReplicaAssignment.
This can be a simple solution, but it's a very strict rule. E.g.
currently with
TopicCommand user can increase nr of partitions and define replica
assignment
for newly added partitions. Taking into account item 2. this will
be even harder
for user to achieve this.

4. Q: Do we need such accurate errors returned from the server:
InvalidArgumentPartitions,
 InvalidArgumentReplicas etc.
A: I started implementation to add proposed error codes and now I think
probably
InvalidArgumentError should be sufficient. We can do simple
validations on
the client side (e.g. AdminClient can ensure nr of partitions
argument is positive),
others - which can be covered only on server (probably invalid
topic config,
replica assignment includes dead broker etc) - will be done on
server, and in case
of invalid argument we will return InvalidArgumentError without
specifying the
concrete field.

It'd be great if we could cover these remaining issues, looks like they are
minor,
at least related to specific messages, not the overall protocol. - I think
with that I can
update confluence page and update patch to reflect all discussed items.
This patch
will probably include Wire protocol messages and server-side code to handle
new
requests. AdminClient and cli-tool implementation can be the next step.

Thanks,
Andrii Biletskyi

On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote:

 Andrii,

 500. I think what you suggested also sounds reasonable. Since ISR is only
 maintained accurately at the leader, TMR can return ISR if the broker is
 the leader of a partition. Otherwise, we can return an empty ISR. For
 partition reassignment, it would be useful to know the lag of each
 follower. Again, the leader knows this info. We can probably include that
 info in TMR as well.

 300. I think it's probably reasonable to restrict AlterTopicRequest to
 change only one thing at a time, i.e., either partitions, replicas, replica
 assignment or config.

 Thanks,

 Jun

 On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Jun,
 
  404. Great, thanks!
 
  500. If I understand correctly KAFKA-1367 says ISR part of TMR may
  be inconsistent. If so, then I believe all admin commands but
 describeTopic
  are not affected. Let me emphasize that it's about AdminClient
 operations,
  not about Wire Protocol requests. What I mean:
  To verify AdminClient.createTopic we will need (consistent) 'topics' set
  from TMR (we don't need isr)
  To verify alterTopic - again, probably 'topics' and 'assigned replicas' +
  configs
  To verify deleteTopic - only 'topics'
  To verify preferredReplica - 'leader', 'assigned replicas'
  To verify reassignPartitions - 'assigned replicas' ? (I'm not sure about
  this one)
  If everything above is correct, then AdminClient.describeTopic is the
 only
  command under risk. We can actually workaround it - find out the leader
  broker
  and ask TMR that leading broker to get up-to-date isr list.
  Bottom line: looks like 1367 is a separate issue, and is not a blocker
 for
  this
  KIP. I'm a bit concerned about adding new requests as a must-have part
  of this KIP when we don't know what we want to include to those requests.
 
  Also, I'd like to write down the new AlterTopicRequest semantics (if we
  decide
  to include replicas there and merge it with ReassignPartitionsRequest)
  300. AlterTopicRequest = [TopicName Partitions Replicas
 ReplicaAssignment
  [AddedConfigEntry] [DeletedConfig]]
  The fields are resolved in this sequence:
  1. Either partition or replicas 

[jira] [Updated] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure

2015-04-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2056:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

 PartitionAssignorTest.testRangePartitionAssignor transient failure
 --

 Key: KAFKA-2056
 URL: https://issues.apache.org/jira/browse/KAFKA-2056
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Fangmin Lv
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-2056.patch, KAFKA-2056.patch


 {code}
 unit.kafka.consumer.PartitionAssignorTest  testRangePartitionAssignor FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure

2015-04-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496624#comment-14496624
 ] 

Guozhang Wang commented on KAFKA-2056:
--

Thanks for the updated patch Fangmin. +1 and pushed to trunk.

 PartitionAssignorTest.testRangePartitionAssignor transient failure
 --

 Key: KAFKA-2056
 URL: https://issues.apache.org/jira/browse/KAFKA-2056
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Fangmin Lv
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-2056.patch, KAFKA-2056.patch


 {code}
 unit.kafka.consumer.PartitionAssignorTest  testRangePartitionAssignor FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2056) PartitionAssignorTest.testRangePartitionAssignor transient failure

2015-04-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2056:
-
Fix Version/s: 0.8.3

 PartitionAssignorTest.testRangePartitionAssignor transient failure
 --

 Key: KAFKA-2056
 URL: https://issues.apache.org/jira/browse/KAFKA-2056
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Fangmin Lv
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-2056.patch, KAFKA-2056.patch


 {code}
 unit.kafka.consumer.PartitionAssignorTest  testRangePartitionAssignor FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRangePartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:79)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRangePartitionAssignor(PartitionAssignorTest.scala:60)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496685#comment-14496685
 ] 

Joel Koshy commented on KAFKA-2125:
---

{noformat}
2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
45], Controlled shutdown succeeded
{noformat}

What is the output of {{grep -i shut}} _after_ the above line? Also, did you 
happen to take a thread-dump as well?

After controlled shutdown succeeds, the broker should proceed to shutdown all 
its components - socket server (which it appears to have done), replica 
manager, controller, etc. However, that sequence seems to have been aborted for 
some reason.


 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker

 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Parth Brahmbhatt
Hi Michael,

There is code in kafka codebase that reads and interprets the topic config JSON 
which has acls, owner and logconfig properties. There are 3 use cases that we 
are supporting with current proposal:

  *   You use out of box simpleAcl authorizer which is tied to the acl stored 
in topic config and the format is locked down.
  *   You have a custom authorizer and a custom ACL store.  Ranger/Argus falls 
under this as they have their own acl store and ui that users use to configure 
acls on the cluster and cluster resources  like topic. It is upto the custom 
authorizer to leverage the kafka acl configs or completely ignore them as they 
have set a user expectation that only acls configured via their ui/system will 
be effective.
  *   You have a custom authorizer but no custom Acl store. You are completely 
tied to Acl structure that we have provided in out of box implementation.

Thanks
Parth

On 4/15/15, 10:31 AM, Michael Herstine 
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote:

Hi Parth,

One question that occurred to me at the end of today’s hangout: how tied
are we to a particular ACL representation under your proposal? I know that
TopicConfigCache will just contain JSON— if a particular site decides they
want to represent their ACLs differently, and swap out the authorizer
implementation, will that work?  I guess what I’m asking is whether
there’s any code in the Kafka codebase that will interpret that JSON, or
does that logic live exclusively in the authorizer?

On 4/14/15, 10:56 PM, Don Bosco Durai 
bo...@apache.orgmailto:bo...@apache.org wrote:

I also feel, having just IP would be more appropriate. Host lookup will
unnecessary slow things down and would be insecure as you pointed out.

With IP, it will be also able to setup policies (in future if needed) with
ranges or netmasks and it would be more scalable.

Bosco


On 4/14/15, 1:40 PM, Michael Herstine 
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Sorry to chime in so late, but I’ve got a minor question on the KIP.

Several methods take a parameter named “host” of type String. Is that
intended to be a hostname, or an IP address? If the former, I’m curious
as
to how that’s found (in my experience, when accepting an incoming socket
connection, you only know the IP address, and there isn’t a way to map
that to a hostname without a round trip to a DNS server, which is
insecure
anyway).


On 3/25/15, 1:07 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
wrote:

Hi all,

I have modified the KIP to reflect the recent change request from the
reviewers. I have been working on the code and I have the server side
code
for authorization ready. I am now modifying the command line utilities.
I
would really appreciate if some of the committers can spend sometime to
review the KIP so we can make progress on this.

Thanks
Parth

On 3/18/15, 2:20 PM, Michael Herstine 
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Thanks! A few questions:

1. Do you want to permit rules in your ACLs that DENY access as well as
ALLOW? This can be handy setting up rules that have exceptions. E.g.
“Allow principal P to READ resource R from all hosts” with “Deny
principal
P READ access to resource R from host H1” in combination would allow P
to
READ R from all hosts *except* H1.

2. When a topic is newly created, will there be an ACL created for it?
If
not, would that not deny subsequent access to it?

(nit) Maybe use Principal instead of String to represent principals?


On 3/9/15, 11:48 AM, Don Bosco Durai 
bo...@apache.orgmailto:bo...@apache.org wrote:

Parth

Overall it is looking good. Couple of questionsŠ

- Can you give an example how the policies will look like in the
default
implementation?
- In the operations, can we support ³CONNECT² also? This can be used
during Session connection
- Regarding access control for ³Topic Creation², since we can¹t do it
on
the server side, can we de-scope it for? And plan it as a future
feature
request?

Thanks

Bosco


On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io wrote:

Hi Parth,
Thanks for putting this together. Overall it looks good
to
me. Although AdminUtils is a concern KIP-4  can probably
fix
that part.
Thanks,
Harsha

On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
Forgot to add links to wiki and jira.
Link to wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza
t
i
o
n
+
Interface
Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
Thanks
Parth
From: Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
Date: Thursday, March 5, 2015 at 10:33 AM
To: 
dev@kafka.apache.orgmailto:dev@kafka.apache.orgmailto:dev@kafka.apache.org
dev@kafka.apache.orgmailto:dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: [DISCUSS] KIP-11- 

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496865#comment-14496865
 ] 

Joel Koshy commented on KAFKA-2125:
---

For some reason, the socket server's processors shutdown got wedged which 
prevented the controller, replica-manager, etc. from shutting down. This fully 
explains the repetitive logs. A full threaddump would help understand why the 
socket server shutdown wedged - do you have the full dump? Also, do you have 
the full stack for the line that repeats 500 times in your attached grep output?

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at 

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496883#comment-14496883
 ] 

Jason Rosenberg commented on KAFKA-2125:


Joel, unfortunately I have no stack trace, and the tack traces I listed above 
are the full stack traces that I have..Going forward, we've removed the 3 
minute automatic jvm shutdown, so next time, if it spins out of control, I can 
manually pull a thread dump.

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread 

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496898#comment-14496898
 ] 

Jason Rosenberg commented on KAFKA-2125:


Ooh, it looks before the 500 repeated exceptions, the first one as a little 
different, which I neglected to include previously:
{code}
2015-04-14 05:56:10,973  INFO [kafka-request-handler-6] log.Log - Truncating 
log mytopic-1 to offset 13860934.
2015-04-14 05:56:10,980  WARN [Controller-45-to-broker-45-send-thread] 
controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
Controller 45 epoch 21 fails to send request 
Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
 - 
(LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
 to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
2015-04-14 05:56:10,981  INFO [Controller-45-to-broker-45-send-thread] 
controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
state change requests
2015-04-14 05:56:10,993 ERROR [kafka-network-thread-27330-2] utils.Utils$ - 
Uncaught exception in thread 'kafka-network-thread-12345-2':
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:374)
at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:180)
at kafka.network.Processor.run(SocketServer.scala:364)
at java.lang.Thread.run(Thread.java:745)
{code}

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 

[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Rosenberg updated KAFKA-2125:
---
Attachment: grep_shut.log

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 {code}
 This seems to repeat every 300 ms or so, and continues until the server is 
 forcibly shutdown (for about 3 minutes in this case).  The 

[jira] [Commented] (KAFKA-2035) Add a topic config cache.

2015-04-15 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496810#comment-14496810
 ] 

Parth Brahmbhatt commented on KAFKA-2035:
-

It does and if I understand correctly you are proposing to create something 
like the following

class Topic(val config:TopicConfig, val logs:Set[Log] ., all other topic 
related stuff).

I am not sure why this does not exist right now but I think moving in that 
direction is a good idea. At the same time I also think this will be a lot more 
work and we can probably take incremental approach without a lot of(or probably 
any) throw away work by starting to model one piece at a time so other jiras 
don't need to be blocked until we settle on a top level Topic model. After all 
the top level Topic will just be a composition of all these small things that 
make up a topic. 

If you think it is essential to nail the Topic structure before making progress 
on this jira and other related jira let me know and I can file a new jira for 
the same.

 Add a topic config cache.
 -

 Key: KAFKA-2035
 URL: https://issues.apache.org/jira/browse/KAFKA-2035
 Project: Kafka
  Issue Type: Task
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2035_2015-03-31_10:52:12.patch


 Currently the topic config is all about Log configuration so we have a 
 TopicConfigManager which takes in a Log instance and keeps updating that 
 instance's config instance as and when the topic config is updated. The topic 
 config update notifications are sent using zk watchers by Controller.
 I propose to introduce a TopicConfigCache which will be updated by 
 TopicConfigManager on any config changes. The log instance and any other 
 component (like the authorizer mentioned in KAFKA-1688) will have a reference 
 to TopicConfigCache using which they will access the topic configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Rosenberg updated KAFKA-2125:
---
Attachment: (was: grep_shut.log)

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 {code}
 This seems to repeat every 300 ms or so, and continues until the server is 
 forcibly shutdown (for about 3 minutes in this 

[jira] [Updated] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Rosenberg updated KAFKA-2125:
---
Attachment: grep_shut_edited.log

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 {code}
 This seems to repeat every 300 ms or so, and continues until the server is 
 forcibly shutdown (for about 3 minutes in this case). 

[jira] [Comment Edited] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496794#comment-14496794
 ] 

Jason Rosenberg edited comment on KAFKA-2125 at 4/15/15 8:04 PM:
-

I uploaded an edited result of doing 'grep -i shut' on the log file after the 
controlled shutdown succeeded. (see 'grep_shut_edited.log').

I added a few comments, on lines preceded with '//'
Also, I included an InterruptedException at the end, which seems to indicate 
where a thread was hung at the time the thread was shutdown by the jvm.


was (Author: jbrosenb...@gmail.com):
I uploaded an edited result of doing 'grep -i shut' on the log file after the 
controlled shutdown succeeded. (see 'grep_shut.log').

I added a few comments, on lines preceded with '//'
Also, I included an InterruptedException at the end, which seems to indicate 
where a thread was hung at the time the thread was shutdown by the jvm.

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. 

Review Request 33239: Patch for KAFKA-2126

2015-04-15 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33239/
---

Review request for kafka.


Bugs: KAFKA-2126
https://issues.apache.org/jira/browse/KAFKA-2126


Repository: kafka


Description
---

KAFKA-2126: Configure automatically instantiated deserializers in new consumer.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 

Diff: https://reviews.apache.org/r/33239/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-2126) New consumer does not correctly configure deserializers

2015-04-15 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2126:
-
Status: Patch Available  (was: Open)

 New consumer does not correctly configure deserializers
 ---

 Key: KAFKA-2126
 URL: https://issues.apache.org/jira/browse/KAFKA-2126
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2126.patch


 Since the new ser/de interfaces use a configure() method with an extra isKey 
 parameter, they need to manually configured after creation since 
 getConfiguredInstances can't invoke configure() for us. The new consumer is 
 missing this step when the deserializer is instantiated automatically by the 
 consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2126) New consumer does not correctly configure deserializers

2015-04-15 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2126:
-
Attachment: KAFKA-2126.patch

 New consumer does not correctly configure deserializers
 ---

 Key: KAFKA-2126
 URL: https://issues.apache.org/jira/browse/KAFKA-2126
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2126.patch


 Since the new ser/de interfaces use a configure() method with an extra isKey 
 parameter, they need to manually configured after creation since 
 getConfiguredInstances can't invoke configure() for us. The new consumer is 
 missing this step when the deserializer is instantiated automatically by the 
 consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2126) New consumer does not correctly configure deserializers

2015-04-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496858#comment-14496858
 ] 

Ewen Cheslack-Postava commented on KAFKA-2126:
--

Created reviewboard https://reviews.apache.org/r/33239/diff/
 against branch origin/trunk

 New consumer does not correctly configure deserializers
 ---

 Key: KAFKA-2126
 URL: https://issues.apache.org/jira/browse/KAFKA-2126
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.3

 Attachments: KAFKA-2126.patch


 Since the new ser/de interfaces use a configure() method with an extra isKey 
 parameter, they need to manually configured after creation since 
 getConfiguredInstances can't invoke configure() for us. The new consumer is 
 missing this step when the deserializer is instantiated automatically by the 
 consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496794#comment-14496794
 ] 

Jason Rosenberg commented on KAFKA-2125:


I uploaded an edited result of doing 'grep -i shut' on the log file after the 
controlled shutdown succeeded. (see 'grep_shut.log').

I added a few comments, on lines preceded with '//'
Also, I included an InterruptedException at the end, which seems to indicate 
where a thread was hung at the time the thread was shutdown by the jvm.

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Tong Li

Parth,
 If one wants to use his or her own access control including
authentication system, with this design what will be needed to be done? Can
one completely turn this off so that the system behaves exactly same as it
is today?

Thanks.

Tong

Sent from my iPhone

 On Apr 15, 2015, at 1:51 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.com wrote:

 Hi Michael,

 There is code in kafka codebase that reads and interprets the topic
config JSON which has acls, owner and logconfig properties. There are 3 use
cases that we are supporting with current proposal:

   *   You use out of box simpleAcl authorizer which is tied to the acl
stored in topic config and the format is locked down.
   *   You have a custom authorizer and a custom ACL store.  Ranger/Argus
falls under this as they have their own acl store and ui that users use to
configure acls on the cluster and cluster resources  like topic. It is upto
the custom authorizer to leverage the kafka acl configs or completely
ignore them as they have set a user expectation that only acls configured
via their ui/system will be effective.
   *   You have a custom authorizer but no custom Acl store. You are
completely tied to Acl structure that we have provided in out of box
implementation.

 Thanks
 Parth

 On 4/15/15, 10:31 AM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

 Hi Parth,

 One question that occurred to me at the end of today’s hangout: how tied
 are we to a particular ACL representation under your proposal? I know
that
 TopicConfigCache will just contain JSON— if a particular site decides
they
 want to represent their ACLs differently, and swap out the authorizer
 implementation, will that work?  I guess what I’m asking is whether
 there’s any code in the Kafka codebase that will interpret that JSON, or
 does that logic live exclusively in the authorizer?

 On 4/14/15, 10:56 PM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

 I also feel, having just IP would be more appropriate. Host lookup will
 unnecessary slow things down and would be insecure as you pointed out.

 With IP, it will be also able to setup policies (in future if needed)
with
 ranges or netmasks and it would be more scalable.

 Bosco


 On 4/14/15, 1:40 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
 wrote:

 Hi Parth,

 Sorry to chime in so late, but I’ve got a minor question on the KIP.

 Several methods take a parameter named “host” of type String. Is that
 intended to be a hostname, or an IP address? If the former, I’m curious
 as
 to how that’s found (in my experience, when accepting an incoming socket
 connection, you only know the IP address, and there isn’t a way to map
 that to a hostname without a round trip to a DNS server, which is
 insecure
 anyway).


 On 3/25/15, 1:07 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
 wrote:

 Hi all,

 I have modified the KIP to reflect the recent change request from the
 reviewers. I have been working on the code and I have the server side
 code
 for authorization ready. I am now modifying the command line utilities.
 I
 would really appreciate if some of the committers can spend sometime to
 review the KIP so we can make progress on this.

 Thanks
 Parth

 On 3/18/15, 2:20 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
 wrote:

 Hi Parth,

 Thanks! A few questions:

 1. Do you want to permit rules in your ACLs that DENY access as well as
 ALLOW? This can be handy setting up rules that have exceptions. E.g.
 “Allow principal P to READ resource R from all hosts” with “Deny
 principal
 P READ access to resource R from host H1” in combination would allow P
 to
 READ R from all hosts *except* H1.

 2. When a topic is newly created, will there be an ACL created for it?
 If
 not, would that not deny subsequent access to it?

 (nit) Maybe use Principal instead of String to represent principals?


 On 3/9/15, 11:48 AM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

 Parth

 Overall it is looking good. Couple of questionsŠ

 - Can you give an example how the policies will look like in the
 default
 implementation?
 - In the operations, can we support ³CONNECT² also? This can be used
 during Session connection
 - Regarding access control for ³Topic Creation², since we can¹t do it
 on
 the server side, can we de-scope it for? And plan it as a future
 feature
 request?

 Thanks

 Bosco


 On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io
wrote:

 Hi Parth,
 Thanks for putting this together. Overall it looks good
 to
 me. Although AdminUtils is a concern KIP-4  can probably
 fix
 that part.
 Thanks,
 Harsha

 On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
 Forgot to add links to wiki and jira.
 Link to wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza
 t
 i
 o
 n

Jenkins build is back to normal : KafkaPreCommit #75

2015-04-15 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/75/changes



Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-15 Thread Guozhang Wang
Thanks for the summary. A few comments below:

1. Say a produce request has replication timeout X, and upon finishing the
local append it is determined to be throttled Y ms where Y  X, then after
it has timed out in the purgatory after Y ms we should still check if the
#.acks has fulfilled in order to set the correct error codes in the
response.

2. I think it is actually common that the calculated throttle time Y is
less than the replication timeout X, which will be a tricky case since we
need to make sure 1) at least the request it held in the purgatory for Y
ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set
no-error-code and return immediately, 3) after X ms elapsed, set
timeout-error-code and return.

Guozhang

On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 This is an implementation proposal for delaying requests in quotas using
 the current purgatory. I'll discuss the usage for produce and fetch
 requests separately.

 1. Delayed Produce Requests - Here, the proposal is basically to reuse
 DelayedProduce objects and insert them into the purgatory with no watcher
 keys if the request is being throttled. The timeout used in the request
 should be the Max(quota_delay_time, replication_timeout).
 In most cases, the quota timeout should be greater than the existing
 timeout but in order to be safe, we can use the maximum of these values.
 Having no watch keys will allow the operation to be enqueued directly into
 the timer and will not add any overhead in terms of watching keys (which
 was a concern). In this case, having watch keys is not beneficial since the
 operation must be delayed for a fixed amount of time and there is no
 possibility for the operation to complete before the timeout i.e.
 tryComplete() can never return true before the timeout. On timeout, since
 the operation is a TimerTask, the timer will call run() which calls
 onComplete().
 In onComplete, the DelayedProduce can repeat the check in tryComplete()
 (only if acks=-1 whether all replicas fetched upto a certain offset) and
 return the response immediately.

 Code will be structured as follows in ReplicaManager:appendMessages()

 if(isThrottled) {
   fetch = new DelayedProduce(timeout)
   purgatory.tryCompleteElseWatch(fetch, Seq())
 }
 else if(delayedRequestRequired()) {
  // Insert into purgatory with watched keys for unthrottled requests
 }

 In this proposal, we avoid adding unnecessary watches because there is no
 possibility of early completion and this avoids any potential performance
 penalties we were concerned about earlier.

 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the
 DelayedFetch objects and insert them into the purgatory with no watcher
 keys if the request is throttled. Timeout used is the Max(quota_delay_time,
 max_wait_timeout). Having no watch keys provides the same benefits as
 described above. Upon timeout, the onComplete() is called and the operation
 proceeds normally i.e. perform a readFromLocalLog and return a response.
 The caveat here is that if the request is throttled but the throttle time
 is less than the max_wait timeout on the fetch request, the request will be
 delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than
 strictly necessary (since we are not watching for
 satisfaction on any keys).

 I added some testcases to DelayedOperationTest to verify that it is
 possible to schedule operations with no watcher keys. By inserting elements
 with no watch keys, the purgatory simply becomes a delay queue. It may also
 make sense to add a new API to the purgatory called
 delayFor() that basically accepts an operation without any watch keys
 (Thanks for the suggestion Joel).

 Thoughts?

 Thanks,
 Aditya

 
 From: Guozhang Wang [wangg...@gmail.com]
 Sent: Monday, April 13, 2015 7:27 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 I think KAFKA-2063 (bounding fetch response) is still under discussion, and
 may not be got it in time with KAFKA-1927.

 On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  I think it's reasonable to batch the protocol changes together. In
  addition to the protocol changes, is someone actively driving the server
  side changes/KIP process for KAFKA-2063?
 
  Thanks,
  Aditya
 
  
  From: Jun Rao [j...@confluent.io]
  Sent: Thursday, April 09, 2015 8:59 AM
  To: dev@kafka.apache.org
  Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
 
  Since we are also thinking about evolving the fetch request protocol in
  KAFKA-2063 (bound fetch response size), perhaps it's worth thinking
 through
  if we can just evolve the protocol once.
 
  Thanks,
 
  Jun
 
  On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar 
  aaurad...@linkedin.com.invalid wrote:
 
   Thanks for the detailed review. I've addressed your comments.
  
   For rejected 

[jira] [Assigned] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-15 Thread Rekha Joshi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rekha Joshi reassigned KAFKA-2098:
--

Assignee: Rekha Joshi

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-15 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121.patch

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 
 --
 -- Guozhang



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33242: Patch for KAFKA-2121

2015-04-15 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

add a unit test file


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Michael Herstine
Hi Parth,

I’m a little confused: why would Kafka need to interpret the JSON?  IIRC
KIP-11 even says that the TopicConfigData will just store the JSON. I’m
not really making a design recommendation here, just trying to understand
what you’re proposing.

On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Hi Michael,

There is code in kafka codebase that reads and interprets the topic
config JSON which has acls, owner and logconfig properties. There are 3
use cases that we are supporting with current proposal:

  *   You use out of box simpleAcl authorizer which is tied to the acl
stored in topic config and the format is locked down.
  *   You have a custom authorizer and a custom ACL store.  Ranger/Argus
falls under this as they have their own acl store and ui that users use
to configure acls on the cluster and cluster resources  like topic. It is
upto the custom authorizer to leverage the kafka acl configs or
completely ignore them as they have set a user expectation that only acls
configured via their ui/system will be effective.
  *   You have a custom authorizer but no custom Acl store. You are
completely tied to Acl structure that we have provided in out of box
implementation.

Thanks
Parth

On 4/15/15, 10:31 AM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

One question that occurred to me at the end of today’s hangout: how tied
are we to a particular ACL representation under your proposal? I know that
TopicConfigCache will just contain JSON— if a particular site decides they
want to represent their ACLs differently, and swap out the authorizer
implementation, will that work?  I guess what I’m asking is whether
there’s any code in the Kafka codebase that will interpret that JSON, or
does that logic live exclusively in the authorizer?

On 4/14/15, 10:56 PM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

I also feel, having just IP would be more appropriate. Host lookup will
unnecessary slow things down and would be insecure as you pointed out.

With IP, it will be also able to setup policies (in future if needed) with
ranges or netmasks and it would be more scalable.

Bosco


On 4/14/15, 1:40 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Sorry to chime in so late, but I’ve got a minor question on the KIP.

Several methods take a parameter named “host” of type String. Is that
intended to be a hostname, or an IP address? If the former, I’m curious
as
to how that’s found (in my experience, when accepting an incoming socket
connection, you only know the IP address, and there isn’t a way to map
that to a hostname without a round trip to a DNS server, which is
insecure
anyway).


On 3/25/15, 1:07 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
wrote:

Hi all,

I have modified the KIP to reflect the recent change request from the
reviewers. I have been working on the code and I have the server side
code
for authorization ready. I am now modifying the command line utilities.
I
would really appreciate if some of the committers can spend sometime to
review the KIP so we can make progress on this.

Thanks
Parth

On 3/18/15, 2:20 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Thanks! A few questions:

1. Do you want to permit rules in your ACLs that DENY access as well as
ALLOW? This can be handy setting up rules that have exceptions. E.g.
“Allow principal P to READ resource R from all hosts” with “Deny
principal
P READ access to resource R from host H1” in combination would allow P
to
READ R from all hosts *except* H1.

2. When a topic is newly created, will there be an ACL created for it?
If
not, would that not deny subsequent access to it?

(nit) Maybe use Principal instead of String to represent principals?


On 3/9/15, 11:48 AM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

Parth

Overall it is looking good. Couple of questionsŠ

- Can you give an example how the policies will look like in the
default
implementation?
- In the operations, can we support ³CONNECT² also? This can be used
during Session connection
- Regarding access control for ³Topic Creation², since we can¹t do it
on
the server side, can we de-scope it for? And plan it as a future
feature
request?

Thanks

Bosco


On 3/6/15, 8:10 AM, Harsha ka...@harsha.iomailto:ka...@harsha.io
wrote:

Hi Parth,
Thanks for putting this together. Overall it looks good
to
me. Although AdminUtils is a concern KIP-4  can probably
fix
that part.
Thanks,
Harsha

On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
Forgot to add links to wiki and jira.
Link to wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authoriza
t
i
o
n
+
Interface
Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
Thanks
Parth
From: Parth Brahmbhatt

Re: Will it be possible to apply quotas based on a security principal?

2015-04-15 Thread Adrian Preston
Jay,

Thanks for the explanation - makes sense to me.  I'd certainly be happy
to work on a patch that implement this option.

Regards
- Adrian


-Jay Kreps jay.kr...@gmail.com wrote: -
To: dev@kafka.apache.org dev@kafka.apache.org
From: Jay Kreps jay.kr...@gmail.com
Date: 04/15/2015 07:42PM
Subject: Re: Will it be possible to apply quotas based on a security principal?

I think this should be a fairly minor follow-up item to have the quotas key
off of user rather than client id. The advantage of starting with client.id
is that it decouples the security work from the quota work in the short
term and provides a mechanism for those using Kafka without authentication
to still enforce quotas.

On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com wrote:

 Hi,

 I've been investigating using Kafka for a multi-user system that applies
 quotas at a per-user level.  Reading through KIP-13 and KAFKA-1682, I
 wondered: are there any plans to link together the security principal and
 client identifier in some way?  Currently it appears these are separate
 concepts - so I can't see any way to apply a quota based on the
 authenticated identity of a user.


 Regards
 - Adrian

 Unless stated otherwise above:
 IBM United Kingdom Limited - Registered in England and Wales with number
 741598.
 Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU




Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-15 Thread alexcb (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497159#comment-14497159
 ] 

alexcb commented on KAFKA-2098:
---

The comment about not committing binaries goes directly against the userguide 
of gradle:

from http://gradle.org/docs/current/userguide/gradle_wrapper.html

{quote}
All of these files should be submitted to your version control system. This 
only needs to be done once. After these files have been added to the project, 
the project should then be built with the added gradlew command. The gradlew 
command can be used exactly the same way as the gradle command. 
{quote}

The kafka README states that one must first run gradle, then run ./gradlew 
jar. The gradle command generates the gradlew script. If we don't want to 
add binaries to the source revision (a noble cause), then perhaps one should 
also avoid adding generated files such as the gradlew file.

I personally lost an hour on this issue already.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-15 Thread Aditya Auradkar
Thanks for the review Guozhang.

1. Agreed. 

2. This proposal actually waits for the maximum of the 2 timeouts. This reduces 
implementation complexity at the cost of waiting longer than strictly needed 
for quotas. Note that this is only for the case where acks=-1. 

However we can solve this if it is a significant concern by adding watcher keys 
for all partitions (only if acks=-1). These are the keys we would normally add 
while waiting for acknowledgements. We can change the tryComplete() function to 
return false until 'quota_timeout' time has elapsed AND all the 
acknowledgements have been received.

Thanks,
Aditya

From: Guozhang Wang [wangg...@gmail.com]
Sent: Wednesday, April 15, 2015 3:42 PM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

Thanks for the summary. A few comments below:

1. Say a produce request has replication timeout X, and upon finishing the
local append it is determined to be throttled Y ms where Y  X, then after
it has timed out in the purgatory after Y ms we should still check if the
#.acks has fulfilled in order to set the correct error codes in the
response.

2. I think it is actually common that the calculated throttle time Y is
less than the replication timeout X, which will be a tricky case since we
need to make sure 1) at least the request it held in the purgatory for Y
ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set
no-error-code and return immediately, 3) after X ms elapsed, set
timeout-error-code and return.

Guozhang

On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 This is an implementation proposal for delaying requests in quotas using
 the current purgatory. I'll discuss the usage for produce and fetch
 requests separately.

 1. Delayed Produce Requests - Here, the proposal is basically to reuse
 DelayedProduce objects and insert them into the purgatory with no watcher
 keys if the request is being throttled. The timeout used in the request
 should be the Max(quota_delay_time, replication_timeout).
 In most cases, the quota timeout should be greater than the existing
 timeout but in order to be safe, we can use the maximum of these values.
 Having no watch keys will allow the operation to be enqueued directly into
 the timer and will not add any overhead in terms of watching keys (which
 was a concern). In this case, having watch keys is not beneficial since the
 operation must be delayed for a fixed amount of time and there is no
 possibility for the operation to complete before the timeout i.e.
 tryComplete() can never return true before the timeout. On timeout, since
 the operation is a TimerTask, the timer will call run() which calls
 onComplete().
 In onComplete, the DelayedProduce can repeat the check in tryComplete()
 (only if acks=-1 whether all replicas fetched upto a certain offset) and
 return the response immediately.

 Code will be structured as follows in ReplicaManager:appendMessages()

 if(isThrottled) {
   fetch = new DelayedProduce(timeout)
   purgatory.tryCompleteElseWatch(fetch, Seq())
 }
 else if(delayedRequestRequired()) {
  // Insert into purgatory with watched keys for unthrottled requests
 }

 In this proposal, we avoid adding unnecessary watches because there is no
 possibility of early completion and this avoids any potential performance
 penalties we were concerned about earlier.

 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the
 DelayedFetch objects and insert them into the purgatory with no watcher
 keys if the request is throttled. Timeout used is the Max(quota_delay_time,
 max_wait_timeout). Having no watch keys provides the same benefits as
 described above. Upon timeout, the onComplete() is called and the operation
 proceeds normally i.e. perform a readFromLocalLog and return a response.
 The caveat here is that if the request is throttled but the throttle time
 is less than the max_wait timeout on the fetch request, the request will be
 delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than
 strictly necessary (since we are not watching for
 satisfaction on any keys).

 I added some testcases to DelayedOperationTest to verify that it is
 possible to schedule operations with no watcher keys. By inserting elements
 with no watch keys, the purgatory simply becomes a delay queue. It may also
 make sense to add a new API to the purgatory called
 delayFor() that basically accepts an operation without any watch keys
 (Thanks for the suggestion Joel).

 Thoughts?

 Thanks,
 Aditya

 
 From: Guozhang Wang [wangg...@gmail.com]
 Sent: Monday, April 13, 2015 7:27 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 I think KAFKA-2063 (bounding fetch response) is still under discussion, and
 may not be got it in time with KAFKA-1927.

 On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar 
 

[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-15 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497057#comment-14497057
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

finally, I was able to post to review board. had some issues with my python 
installation. finally used virtualenv to make it work. I haven't yet addressed 
[~ewencp] comments yet. will update the reviewboard.

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to 

[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-15 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497051#comment-14497051
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Created reviewboard https://reviews.apache.org/r/33242/diff/
 against branch apache/trunk

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 
 --
 -- 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-15 Thread Jiangjie Qin
Checked the code again. It seems that the disconnected channel is not
detected by selector as expected.

Currently we are depending on the
o.a.k.common.network.Selector.disconnected set to see if we need to do
something for a disconnected channel.
However Selector.disconnected set is only updated when:
1. A write/read/connect to channel failed.
2. A Key is canceled
However when a broker is down before it sends back the response, the
client seems not be able to detect this failure.

I did a simple test below:
1. Run a selector on one machine and an echo server on another machine.
Connect a selector to an echo server
2. Send a message to echo server using selector, then let the selector
poll() every 10 seconds.
3. After the sever received the message, unplug cable on the echo server.
4. After waiting for 45 min. The selector still did not detected the
network failure.
Lsof on selector machine shows that the TCP connection is still considered
ESTABLISHED.

I’m not sure in this case what should we expect from the
java.nio.channels.Selector. According to the document, the selector does
not verify the status of the associated channel. In my test case it looks
even worse that OS did not think of the socket has been disconnected.

Anyway. It seems adding the client side request timeout is necessary. I’ve
updated the KIP page to clarify the problem we want to solve according to
Ewen’s comments.

Thanks.

Jiangjie (Becket) Qin

On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Ewen, thanks for the comments. Very good points! Please see replies
 inline.


 On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 Jiangjie,
 
 Great start. I have a couple of comments.
 
 Under the motivation section, is it really true that the request will
 never
 be completed? Presumably if the broker goes down the connection will be
 severed, at worst by a TCP timeout, which should clean up the
connection
 and any outstanding requests, right? I think the real reason we need a
 different timeout is that the default TCP timeouts are ridiculously
long
 in
 this context.
 Yes, when broker is completely down the request should be cleared as you
 said. The case we encountered looks like the broker was just not
 responding but TCP connection was still alive though.


Ok, that makes sense.



 
 My second question is about whether this is the right level to tackle
the
 issue/what user-facing changes need to be made. A related problem came
up
 in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
 records
 get stuck indefinitely because there's no client-side timeout. This KIP
 wouldn't fix that problem or any problems caused by lack of
connectivity
 since this would only apply to in flight requests, which by definition
 must
 have been sent on an active connection.
 
 I suspect both types of problems probably need to be addressed
separately
 by introducing explicit timeouts. However, because the settings
introduced
 here are very much about the internal implementations of the clients,
I'm
 wondering if this even needs to be a user-facing setting, especially
if we
 have to add other timeouts anyway. For example, would a fixed, generous
 value that's still much shorter than a TCP timeout, say 15s, be good
 enough? If other timeouts would allow, for example, the clients to
 properly
 exit even if requests have not hit their timeout, then what's the
benefit
 of being able to configure the request-level timeout?
 That is a very good point. We have three places that we might be able to
 enforce timeout for a message send:
 1. Before append to accumulator - handled by metadata timeout on per
 message level.
 2. Batch of messages inside accumulator - no timeout mechanism now.
 3. Request of batches after messages leave the accumulator - we have a
 broker side timeout but no client side timeout for now.
 My current proposal only address (3) but not (2).
 Honestly I do not have a very clear idea about what should we do with
(2)
 right now. But I am with you that we should not expose too many
 configurations to users. What I am thinking now to handle (2) is when
user
 call send, if we know that a partition is offline, we should throw
 exception immediately instead of putting it into accumulator. This would
 protect further memory consumption. We might also want to fail all the
 batches in the dequeue once we found a partition is offline.  That
said, I
 feel timeout might not be quite applicable to (2).
 Do you have any suggestion on this?


Right, I didn't actually mean to solve 2 here, but was trying to figure
out
if a solution to 2 would reduce what we needed to do to address 3. (And
depending on how they are implemented, fixing 1 might also address 2). It
sounds like you hit hang that I wasn't really expecting. This probably
just
means the KIP motivation needs to be a bit clearer about what type of
situation this 

[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata

2015-04-15 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2102:
--
Attachment: KAFKA-2102_2015-04-15_19:55:45.patch

 Remove unnecessary synchronization when managing metadata
 -

 Key: KAFKA-2102
 URL: https://issues.apache.org/jira/browse/KAFKA-2102
 Project: Kafka
  Issue Type: Improvement
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, 
 KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, 
 eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt


 Usage of the org.apache.kafka.clients.Metadata class is synchronized. It 
 seems like the current functionality could be maintained without 
 synchronizing the whole class.
 I have been working on improving this by moving to finer grained locks and 
 using atomic operations. My initial benchmarking of the producer is that this 
 will improve latency (using HDRHistogram) on submitting messages.
 I have produced an initial patch. I do not necessarily believe this is 
 complete. And I want to definitely produce some more benchmarks. However, I 
 wanted to get early feedback because this change could be deceptively tricky.
 I am interested in knowing if this is:
 1. Something that is of interest to the maintainers/community.
 2. Along the right track
 3. If there are any gotchas that make my current approach naive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata

2015-04-15 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2102:
--
Status: Patch Available  (was: In Progress)

 Remove unnecessary synchronization when managing metadata
 -

 Key: KAFKA-2102
 URL: https://issues.apache.org/jira/browse/KAFKA-2102
 Project: Kafka
  Issue Type: Improvement
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, 
 KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, 
 eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt


 Usage of the org.apache.kafka.clients.Metadata class is synchronized. It 
 seems like the current functionality could be maintained without 
 synchronizing the whole class.
 I have been working on improving this by moving to finer grained locks and 
 using atomic operations. My initial benchmarking of the producer is that this 
 will improve latency (using HDRHistogram) on submitting messages.
 I have produced an initial patch. I do not necessarily believe this is 
 complete. And I want to definitely produce some more benchmarks. However, I 
 wanted to get early feedback because this change could be deceptively tricky.
 I am interested in knowing if this is:
 1. Something that is of interest to the maintainers/community.
 2. Along the right track
 3. If there are any gotchas that make my current approach naive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2102) Remove unnecessary synchronization when managing metadata

2015-04-15 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497396#comment-14497396
 ] 

Tim Brooks commented on KAFKA-2102:
---

Updated reviewboard https://reviews.apache.org/r/32937/diff/
 against branch origin/trunk

 Remove unnecessary synchronization when managing metadata
 -

 Key: KAFKA-2102
 URL: https://issues.apache.org/jira/browse/KAFKA-2102
 Project: Kafka
  Issue Type: Improvement
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, 
 KAFKA-2102_2015-04-15_19:55:45.patch, eight-threads-patch.txt, 
 eight-threads-trunk.txt, five-threads-patch.txt, five-threads-trunk.txt


 Usage of the org.apache.kafka.clients.Metadata class is synchronized. It 
 seems like the current functionality could be maintained without 
 synchronizing the whole class.
 I have been working on improving this by moving to finer grained locks and 
 using atomic operations. My initial benchmarking of the producer is that this 
 will improve latency (using HDRHistogram) on submitting messages.
 I have produced an initial patch. I do not necessarily believe this is 
 complete. And I want to definitely produce some more benchmarks. However, I 
 wanted to get early feedback because this change could be deceptively tricky.
 I am interested in knowing if this is:
 1. Something that is of interest to the maintainers/community.
 2. Along the right track
 3. If there are any gotchas that make my current approach naive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32937: Patch for KAFKA-2102

2015-04-15 Thread Tim Brooks

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32937/
---

(Updated April 16, 2015, 12:56 a.m.)


Review request for kafka.


Bugs: KAFKA-2102
https://issues.apache.org/jira/browse/KAFKA-2102


Repository: kafka


Description (updated)
---

Method does not need to be synchronized


Do not synchronize contains topic method


Continue removing the need to synchronize the metadata object


Store both last refresh and need to refresh in same variable


Fix synchronize issue


Version needs to be volatile


rework how signally happens


remove unnecessary creation of new set


initialize 0 at the field level


Fix the build


Start moving synchronization of metadata to different class


Start moving synchronization work to new class


Remove unused code


Functionality works. Not threadsafe


move version into metadata synchronizer


Make version volatile


Rename classes


move to finergrained locking


Use locks in bookkeeper


Only use atomic variabled


use successful metadata in metrics


Change these things back to trunk


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 
  clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 

Diff: https://reviews.apache.org/r/32937/diff/


Testing
---


Thanks,

Tim Brooks



[jira] [Updated] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Rosenberg updated KAFKA-2127:
---
Description: 
I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
It logs these with ERROR status.

I recently used the kafka.admin.TopicCommand tool to increase the partitions 
for a topic from 1 to 4.  So I ran:

{code}
 java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
--topic mytopic --alter --partitions 4
{code}

This resulted in the following sequence in the broker log (repeated pretty much 
in the logs of each broker):

{code}
2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] network.Processor 
- Closing socket connection to /1.2.3.12.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.89.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.95.
2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor 
- Closing socket for /1.2.4.34 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.4.59 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.3.11 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
fetcher for partitions [mytopic,2]
2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
load of log mytopic-2 with log end offset 0
2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
Created log for partition [mytopic,2] in /data/kafka_logs with properties 
{segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes 
- 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 
8640, index.interval.bytes - 4096, retention.bytes - 500, 
min.insync.replicas - 1, cleanup.policy - delete, 
unclean.leader.election.enable - true, segment.ms - 60480, 
max.message.bytes - 112, flush.messages - 9223372036854775807, 
min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms 
- 0}.
2015-04-16 03:51:26,454  WARN [kafka-request-handler-3] cluster.Partition - 
Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for 
partition [mytopic,2]
2015-04-16 03:51:26,558  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.4.34.
2015-04-16 03:51:26,658  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket 

[jira] [Comment Edited] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570
 ] 

Jason Rosenberg edited comment on KAFKA-2127 at 4/16/15 5:19 AM:
-

the ip's that show in the 'Closing socket' and 'Connection reset' log lines 
appear to be from hosts which have kafka consumers running, so maybe that's the 
explanation.  Consumers consuming the topic had their connections closed by the 
broker (except in some cases the consumer app gets a rebalance event first, 
which causes the connection to be closed remotely first)?

I'm wondering if we can get rid of the ERROR logging for the connection resets 
however.  Since it is essentially expected behavior when adding/deleting 
partitions, that connections will be closed, it seems we don't need to log the 
stack trace for the connection reset and log it as ERROR.


was (Author: jbrosenb...@gmail.com):
the ip's that show in the 'Closing socket' and 'Connection reset' log lines 
appear to be random hosts in our deployment, hosts which produce messages to 
kafka, but are otherwise not part of the kafka cluster or zookeeper, etc.

 Running TopicCommand --alter causes connection close/reset errors in kafka 
 logs
 ---

 Key: KAFKA-2127
 URL: https://issues.apache.org/jira/browse/KAFKA-2127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Priority: Minor

 I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
 alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
 bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
 It logs these with ERROR status.
 I recently used the kafka.admin.TopicCommand tool to increase the partitions 
 for a topic from 1 to 4.  So I ran:
 {code}
  java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
 --topic mytopic --alter --partitions 4
 {code}
 This resulted in the following sequence in the broker log (repeated pretty 
 much in the logs of each broker):
 {code}
 2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] 
 network.Processor - Closing socket connection to /1.2.3.12.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.89.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.95.
 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] 
 network.Processor - Closing socket for /1.2.4.34 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.4.59 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.3.11 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at 

[jira] [Created] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)
Jason Rosenberg created KAFKA-2127:
--

 Summary: Running TopicCommand --alter causes connection 
close/reset errors in kafka logs
 Key: KAFKA-2127
 URL: https://issues.apache.org/jira/browse/KAFKA-2127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Priority: Minor


I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
It logs these with ERROR status.

I recently used the kafka.admin.TopicCommand tool to increase the partitions 
for a topic from 1 to 4.  So I ran:

{code}
 java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
--topic mytopic --alter --partitions 4
{code}

{code}
2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] network.Processor 
- Closing socket connection to /1.2.3.12.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.89.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.95.
2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor 
- Closing socket for /1.2.4.34 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.4.59 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.3.11 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
fetcher for partitions [mytopic,2]
2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
load of log mytopic-2 with log end offset 0
2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
Created log for partition [mytopic,2] in 
/data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties 
{segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes 
- 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 
8640, index.interval.bytes - 4096, retention.bytes - 500, 
min.insync.replicas - 1, cleanup.policy - delete, 
unclean.leader.election.enable - true, segment.ms - 60480, 
max.message.bytes - 112, flush.messages - 9223372036854775807, 
min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms 
- 0}.
2015-04-16 03:51:26,454  WARN [kafka-request-handler-3] cluster.Partition - 
Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for 
partition [mytopic,2]
2015-04-16 03:51:26,558  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to 

[jira] [Commented] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497576#comment-14497576
 ] 

Jason Rosenberg commented on KAFKA-2127:


Also, unrelated to the socket errors, there's this line:
{code}
2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
fetcher for partitions [mytopic,2]
{code}

Since the new partition 'myopic,2' was just created, I'm not sure why we have 
this log line (as the fetcher should not have even been created yet for that 
partition).

 Running TopicCommand --alter causes connection close/reset errors in kafka 
 logs
 ---

 Key: KAFKA-2127
 URL: https://issues.apache.org/jira/browse/KAFKA-2127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Priority: Minor

 I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
 alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
 bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
 It logs these with ERROR status.
 I recently used the kafka.admin.TopicCommand tool to increase the partitions 
 for a topic from 1 to 4.  So I ran:
 {code}
  java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
 --topic mytopic --alter --partitions 4
 {code}
 This resulted in the following sequence in the broker log (repeated pretty 
 much in the logs of each broker):
 {code}
 2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] 
 network.Processor - Closing socket connection to /1.2.3.12.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.89.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.95.
 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] 
 network.Processor - Closing socket for /1.2.4.34 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.4.59 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.3.11 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
 server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
 fetcher for partitions [mytopic,2]
 2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
 load of log mytopic-2 with log end offset 0
 2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
 Created log for partition [mytopic,2] in 

[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-15 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497350#comment-14497350
 ] 

Rekha Joshi commented on KAFKA-2098:


Hi.I agree [~alexcb] people all over the world are losing time on it :), though 
I also understand where [~gwenshap] is coming from.

For users point of view better to have it in, especially as many standard open 
source based on gradle have them, one that comes to mind immediately is Samza - 
https://github.com/apache/samza/tree/master/gradle/wrapper

Anyhow I will let Kafka committers and [~jkreps] decide whats works best.

Thanks
Rekha

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2102) Remove unnecessary synchronization when managing metadata

2015-04-15 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497392#comment-14497392
 ] 

Tim Brooks commented on KAFKA-2102:
---

I added an updated patch. This patch includes a few things:

1. I moved to using a finer locking strategy opposed to attempting to use all 
atomic instructions. None of the methods are synchronized.
2. I delegated the synchronization code and data about when the last update 
was, etc to a new MetadataBookkeeper. When I was first reading the old code I 
had some issues parsing the mixture of cluster state, topic state, state about 
when to do the next update, and state about when the last update had been 
completed. Maybe my changes make this easier to parse. Maybe they don't. 
3. I moved lastNoNodeAvailableMs in the NetworkClient state into the 
MetadataBookkeeper. Since this variable was essentially a failed attempt to 
update metadata, and it was not accessed in any different way for distinct 
metrics, it seemed like it would be nicer to keep state about when the next 
metadata update should happen together.
4. No one has responded to KAFKA-2101. But it was highly relevant to what I was 
working on, so it is affected by this patch. I created a distinction between 
successful metadata update and a metadata update attempt. The metadata-age 
metric only uses the last successful update in its reports. This seemed like 
the correct approach based on the name of that metric. Since a failed update 
does not make the metadata any younger.

The performance improvements are primarily in the 90+ percentile. I ran a 
producer test with both five and eight threads pushing 10,000 messages to 
kafka. And I repeated it ten times. I recorded the time with HDRHistogram.

The improvements were somewhere between 4-30% reduced latency in the 90+%. For 
example at the 0.99062500 percentile on the five thread test the latency 
was reduced from 14.223 microseconds to 9.775 (31%). At the 0.9000 
percentile the latency was reduced from 2.947 to 2.837 (3.9%) So certainly not 
a lot. But pretty consistently across the higher percentiles, the latency is 
improved.

In the five thread test the mean decreased 4.8%. In the eight thread test the 
mean decreased 7.8%.

The code for the latency test can be found here:

https://github.com/tbrooks8/kafka-latency-test

 Remove unnecessary synchronization when managing metadata
 -

 Key: KAFKA-2102
 URL: https://issues.apache.org/jira/browse/KAFKA-2102
 Project: Kafka
  Issue Type: Improvement
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch


 Usage of the org.apache.kafka.clients.Metadata class is synchronized. It 
 seems like the current functionality could be maintained without 
 synchronizing the whole class.
 I have been working on improving this by moving to finer grained locks and 
 using atomic operations. My initial benchmarking of the producer is that this 
 will improve latency (using HDRHistogram) on submitting messages.
 I have produced an initial patch. I do not necessarily believe this is 
 complete. And I want to definitely produce some more benchmarks. However, I 
 wanted to get early feedback because this change could be deceptively tricky.
 I am interested in knowing if this is:
 1. Something that is of interest to the maintainers/community.
 2. Along the right track
 3. If there are any gotchas that make my current approach naive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2102) Remove unnecessary synchronization when managing metadata

2015-04-15 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2102:
--
Attachment: five-threads-trunk.txt
five-threads-patch.txt
eight-threads-trunk.txt
eight-threads-patch.txt

HDRHistogram results

 Remove unnecessary synchronization when managing metadata
 -

 Key: KAFKA-2102
 URL: https://issues.apache.org/jira/browse/KAFKA-2102
 Project: Kafka
  Issue Type: Improvement
Reporter: Tim Brooks
Assignee: Tim Brooks
 Attachments: KAFKA-2102.patch, KAFKA-2102_2015-04-08_00:20:33.patch, 
 eight-threads-patch.txt, eight-threads-trunk.txt, five-threads-patch.txt, 
 five-threads-trunk.txt


 Usage of the org.apache.kafka.clients.Metadata class is synchronized. It 
 seems like the current functionality could be maintained without 
 synchronizing the whole class.
 I have been working on improving this by moving to finer grained locks and 
 using atomic operations. My initial benchmarking of the producer is that this 
 will improve latency (using HDRHistogram) on submitting messages.
 I have produced an initial patch. I do not necessarily believe this is 
 complete. And I want to definitely produce some more benchmarks. However, I 
 wanted to get early feedback because this change could be deceptively tricky.
 I am interested in knowing if this is:
 1. Something that is of interest to the maintainers/community.
 2. Along the right track
 3. If there are any gotchas that make my current approach naive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Rosenberg updated KAFKA-2127:
---
Description: 
I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
It logs these with ERROR status.

I recently used the kafka.admin.TopicCommand tool to increase the partitions 
for a topic from 1 to 4.  So I ran:

{code}
 java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
--topic mytopic --alter --partitions 4
{code}

This resulted in the following sequence in the broker log (repeated pretty much 
in the logs of each broker):

{code}
2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] network.Processor 
- Closing socket connection to /1.2.3.12.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.89.
2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.3.95.
2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] network.Processor 
- Closing socket for /1.2.4.34 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.4.59 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] network.Processor 
- Closing socket for /1.2.3.11 because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
fetcher for partitions [mytopic,2]
2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
load of log mytopic-2 with log end offset 0
2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
Created log for partition [mytopic,2] in 
/data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties 
{segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes 
- 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 
8640, index.interval.bytes - 4096, retention.bytes - 500, 
min.insync.replicas - 1, cleanup.policy - delete, 
unclean.leader.election.enable - true, segment.ms - 60480, 
max.message.bytes - 112, flush.messages - 9223372036854775807, 
min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms 
- 0}.
2015-04-16 03:51:26,454  WARN [kafka-request-handler-3] cluster.Partition - 
Partition [mytopic,2] on broker 45: No checkpointed highwatermark is found for 
partition [mytopic,2]
2015-04-16 03:51:26,558  INFO [kafka-network-thread-27330-0] network.Processor 
- Closing socket connection to /1.2.4.34.
2015-04-16 03:51:26,658  INFO [kafka-network-thread-27330-0] 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Parth Brahmbhatt
Kafka currently stores logConfig overrides specified during topic creation
in zookeeper, its just an instance of java.util.Properties converted to
json. I am proposing in addition to that we store acls and owner as well
as part of same Properties map.
There is some infrastructure around reading this config, converting it
back to Properties map and most importantly propagating any changes
efficiently which we will be able to leverage. As this infrastructure is
common to the cluster the reading (not interpreting) of config happens
outside of any authorization code.

If the TopicConfigCache just kept the json representation and left it to
authorizer to parse it, the authorizer will have to either parse the json
for each request(not acceptable) or it will have to keep one more layer of
parsed ACL instance cache. Assuming authorizer will keep an additional
caching layer we will now have to implement some way to invalidate the
cache which means the TopicConfigCache will have to be an observable which
the Authorizer observes and invalidates its cache entries when
topicConfigCache gets updated. Seemed like unnecessary complexity with not
lot to gain so I went with TopicConfigCache interpreting the json and
caching a higher level modeled object.

In summary, the interpretation is done for both optimization and
simplicity. If you think it is important to allow custom ACL format
support we can add one more pluggable config(acl.parser) and
interface(AclParser) or it could just be another method in Authorizer.
One thing to note the current ACL json is versioned so it is easy to make
changes to it however it won’t be possible to support custom ACL formats
with the current design.

Thanks
Parth

On 4/15/15, 4:29 PM, Michael Herstine mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

I’m a little confused: why would Kafka need to interpret the JSON?  IIRC
KIP-11 even says that the TopicConfigData will just store the JSON. I’m
not really making a design recommendation here, just trying to understand
what you’re proposing.

On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Hi Michael,

There is code in kafka codebase that reads and interprets the topic
config JSON which has acls, owner and logconfig properties. There are 3
use cases that we are supporting with current proposal:

  *   You use out of box simpleAcl authorizer which is tied to the acl
stored in topic config and the format is locked down.
  *   You have a custom authorizer and a custom ACL store.  Ranger/Argus
falls under this as they have their own acl store and ui that users use
to configure acls on the cluster and cluster resources  like topic. It is
upto the custom authorizer to leverage the kafka acl configs or
completely ignore them as they have set a user expectation that only acls
configured via their ui/system will be effective.
  *   You have a custom authorizer but no custom Acl store. You are
completely tied to Acl structure that we have provided in out of box
implementation.

Thanks
Parth

On 4/15/15, 10:31 AM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

One question that occurred to me at the end of today’s hangout: how tied
are we to a particular ACL representation under your proposal? I know
that
TopicConfigCache will just contain JSON— if a particular site decides
they
want to represent their ACLs differently, and swap out the authorizer
implementation, will that work?  I guess what I’m asking is whether
there’s any code in the Kafka codebase that will interpret that JSON, or
does that logic live exclusively in the authorizer?

On 4/14/15, 10:56 PM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

I also feel, having just IP would be more appropriate. Host lookup will
unnecessary slow things down and would be insecure as you pointed out.

With IP, it will be also able to setup policies (in future if needed)
with
ranges or netmasks and it would be more scalable.

Bosco


On 4/14/15, 1:40 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Sorry to chime in so late, but I’ve got a minor question on the KIP.

Several methods take a parameter named “host” of type String. Is that
intended to be a hostname, or an IP address? If the former, I’m curious
as
to how that’s found (in my experience, when accepting an incoming socket
connection, you only know the IP address, and there isn’t a way to map
that to a hostname without a round trip to a DNS server, which is
insecure
anyway).


On 3/25/15, 1:07 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
wrote:

Hi all,

I have modified the KIP to reflect the recent change request from the
reviewers. I have been working on the code and I have the server side
code
for authorization ready. I am now modifying the command line utilities.
I
would really appreciate if some of the committers can spend sometime to
review the KIP so 

Re: Review Request 32937: Patch for KAFKA-2102

2015-04-15 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32937/#review80298
---


This version looks a lot better than the original patch -- it's a lot clearer 
what's going on and some of the error-prone tricks have been removed. I also 
like the separation of the management of the Cluster object from the 
request/versioning stuff as long as we can convince ourselves that it's safe to 
do so, I also found the combination confusing to reason about.

I filed a couple of issues. I'd probably also want to double check the last 
update/timeout code -- it was very difficult to get that working properly under 
all conditions, so I'd want to be certain we didn't lose anything in moving 
that around and changing how successes/failures are handled.


clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java
https://reviews.apache.org/r/32937/#comment130108

This patch will definitely need a comment somewhere explaining the locking 
strategy and the reasoning behind it. It's won't be obvious even to someone 
familiar with other client code why this all works the way it does.



clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java
https://reviews.apache.org/r/32937/#comment130115

Is there any benefit to using a Lock + Condition instead of the monitor for 
the object? Seems like it would make the code a bit simpler - you'd get rid of 
all the try/finally blocks.



clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java
https://reviews.apache.org/r/32937/#comment130111

Are the lazySets accomplishing anything performance-wise? I doubt most 
people even know the method exists, let alone the implications.

Some seem like they're probably ok, e.g. none of the code that uses the 
version number relies on seeing the updated value immediately. But others I'm 
less sure about (and the lack of documentation on lazySet makes it hard to be 
certain). For example, using updateRequested.lazySet() might work, but could 
give incorrect results if the old value is returned to a different thread. I 
think this only affects the network thread in the producer, but the consumer 
can potentially be called from different threads. Are we sure the lazySet works 
as expected in that case?


- Ewen Cheslack-Postava


On April 16, 2015, 12:56 a.m., Tim Brooks wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32937/
 ---
 
 (Updated April 16, 2015, 12:56 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2102
 https://issues.apache.org/jira/browse/KAFKA-2102
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Method does not need to be synchronized
 
 
 Do not synchronize contains topic method
 
 
 Continue removing the need to synchronize the metadata object
 
 
 Store both last refresh and need to refresh in same variable
 
 
 Fix synchronize issue
 
 
 Version needs to be volatile
 
 
 rework how signally happens
 
 
 remove unnecessary creation of new set
 
 
 initialize 0 at the field level
 
 
 Fix the build
 
 
 Start moving synchronization of metadata to different class
 
 
 Start moving synchronization work to new class
 
 
 Remove unused code
 
 
 Functionality works. Not threadsafe
 
 
 move version into metadata synchronizer
 
 
 Make version volatile
 
 
 Rename classes
 
 
 move to finergrained locking
 
 
 Use locks in bookkeeper
 
 
 Only use atomic variabled
 
 
 use successful metadata in metrics
 
 
 Change these things back to trunk
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 
   clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
 
 Diff: https://reviews.apache.org/r/32937/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Tim Brooks
 




[jira] [Comment Edited] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570
 ] 

Jason Rosenberg edited comment on KAFKA-2127 at 4/16/15 5:13 AM:
-

the ip's that show in the 'Closing socket' and 'Connection reset' log lines 
appear to be random hosts in our deployment, hosts which produce messages to 
kafka, but are otherwise not part of the kafka cluster or zookeeper, etc.


was (Author: jbrosenb...@gmail.com):
the ip's that show in the 'Closing socket' and 'Connection reset' log lines 
appear to be random hosts in our deployment, hosts which produce messages to 
kafka, but are otherwise not part of the kafka deployment.

 Running TopicCommand --alter causes connection close/reset errors in kafka 
 logs
 ---

 Key: KAFKA-2127
 URL: https://issues.apache.org/jira/browse/KAFKA-2127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Priority: Minor

 I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
 alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
 bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
 It logs these with ERROR status.
 I recently used the kafka.admin.TopicCommand tool to increase the partitions 
 for a topic from 1 to 4.  So I ran:
 {code}
  java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
 --topic mytopic --alter --partitions 4
 {code}
 This resulted in the following sequence in the broker log (repeated pretty 
 much in the logs of each broker):
 {code}
 2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] 
 network.Processor - Closing socket connection to /1.2.3.12.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.89.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.95.
 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] 
 network.Processor - Closing socket for /1.2.4.34 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.4.59 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.3.11 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
 server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
 fetcher for partitions [mytopic,2]
 2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
 load of log mytopic-2 with log end offset 0
 

[jira] [Commented] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497570#comment-14497570
 ] 

Jason Rosenberg commented on KAFKA-2127:


the ip's that show in the 'Closing socket' and 'Connection reset' log lines 
appear to be random hosts in our deployment, hosts which produce messages to 
kafka, but are otherwise not part of the kafka deployment.

 Running TopicCommand --alter causes connection close/reset errors in kafka 
 logs
 ---

 Key: KAFKA-2127
 URL: https://issues.apache.org/jira/browse/KAFKA-2127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Priority: Minor

 I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
 alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
 bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
 It logs these with ERROR status.
 I recently used the kafka.admin.TopicCommand tool to increase the partitions 
 for a topic from 1 to 4.  So I ran:
 {code}
  java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
 --topic mytopic --alter --partitions 4
 {code}
 This resulted in the following sequence in the broker log (repeated pretty 
 much in the logs of each broker):
 {code}
 2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] 
 network.Processor - Closing socket connection to /1.2.3.12.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.89.
 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /1.2.3.95.
 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] 
 network.Processor - Closing socket for /1.2.4.34 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.4.59 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] 
 network.Processor - Closing socket for /1.2.3.11 because of error
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:380)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:444)
 at kafka.network.Processor.run(SocketServer.scala:340)
 at java.lang.Thread.run(Thread.java:745)
 2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
 server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
 fetcher for partitions [mytopic,2]
 2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
 load of log mytopic-2 with log end offset 0
 2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
 Created log for partition [mytopic,2] in 
 /data_sdd/app/samsa-kafkaserver-ng/kafka_logs_ng with properties 
 {segment.index.bytes - 10485760, file.delete.delay.ms - 6, 
 segment.bytes - 1073741824, flush.ms - 9223372036854775807, 
 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-15 Thread Guozhang Wang
Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the request
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
it will change the config names but will reduce confusions moving forward.


Guozhang


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.

 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.

 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo server.
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still considered
 ESTABLISHED.

 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector does
 not verify the status of the associated channel. In my test case it looks
 even worse that OS did not think of the socket has been disconnected.

 Anyway. It seems adding the client side request timeout is necessary. I’ve
 updated the KIP page to clarify the problem we want to solve according to
 Ewen’s comments.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hi Ewen, thanks for the comments. Very good points! Please see replies
  inline.
 
 
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Jiangjie,
  
  Great start. I have a couple of comments.
  
  Under the motivation section, is it really true that the request will
  never
  be completed? Presumably if the broker goes down the connection will be
  severed, at worst by a TCP timeout, which should clean up the
 connection
  and any outstanding requests, right? I think the real reason we need a
  different timeout is that the default TCP timeouts are ridiculously
 long
  in
  this context.
  Yes, when broker is completely down the request should be cleared as you
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
 
 
 Ok, that makes sense.
 
 
 
  
  My second question is about whether this is the right level to tackle
 the
  issue/what user-facing changes need to be made. A related problem came
 up
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records
  get stuck indefinitely because there's no client-side timeout. This KIP
  wouldn't fix that problem or any problems caused by lack of
 connectivity
  since this would only apply to in flight requests, which by definition
  must
  have been sent on an active connection.
  
  I suspect both types of problems probably need to be addressed
 separately
  by introducing explicit timeouts. However, because the settings
 introduced
  here are very much about the internal implementations of the clients,
 I'm
  wondering if this even needs to be a user-facing setting, especially
 if we
  have to add other timeouts anyway. For example, would a fixed, generous
  value that's still much shorter than a TCP timeout, say 15s, be good
  enough? If other timeouts would allow, for example, the clients to
  properly
  exit even if requests have not hit their timeout, then what's the
 benefit
  of being able to configure the request-level timeout?
  That is a very good point. We have three places that we might be able to
  enforce timeout for a message send:
  1. Before append to accumulator - handled by metadata timeout on per
  message level.
  2. Batch of messages inside accumulator - no timeout mechanism now.
  3. Request of batches after messages leave the accumulator - we have a
  broker side timeout but no client side timeout for now.
  My current proposal only address (3) but not (2).
  Honestly I do not have a very clear idea about what should we do with
 (2)
  right now. But I am with you that we should not 

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496951#comment-14496951
 ] 

Joel Koshy commented on KAFKA-2125:
---

Ah yes that would explain it. [~sriharsha] we were pretty sure we had fixed it 
but apparently there still seems to be a corner case. (KAFKA-1577)

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-15 Thread Parth Brahmbhatt
I have added the following to list of open questions based on the hangout
discussion:

* The owner field of a topic in current proposal is set to the user who
created the topic and this user has all access to the topic. There was
suggestion on making this a list of users who can share ownership.
alternatively we can keep the user as a single entity but the user
creating the topic will have to ensure that the topic acls are configured
to allow admin access to all the other users that wants to assume
co-ownership.


It will be great if we can at least agree on the following things:
* The newly proposed broker configs, their types and names
* The Authorizer interface and the Acl structure
* The command line options being added, their name and types
* The new structure of topic config which is being stored in zookeeper

Thanks
Parth

On 4/15/15, 12:53 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Yes, it can be turned off completely. We are proposing to add
authorizer.class.name as a broker config. The value of this config can be
null/unspecified (which is the default) in which case no authorization
will be performed. It can be set to any FQCN of any class that implements
Authorizer so you can plugin custom authorizers.

Authentication is a different beast and you should look at other security
related KIPs, here is the top level jira
https://issues.apache.org/jira/browse/KAFKA-1682 and top level document
https://cwiki.apache.org/confluence/display/KAFKA/Security

Thanks
Parth

On 4/15/15, 11:56 AM, Tong Li liton...@us.ibm.com wrote:


Parth,
 If one wants to use his or her own access control including
authentication system, with this design what will be needed to be done?
Can
one completely turn this off so that the system behaves exactly same as
it
is today?

Thanks.

Tong

Sent from my iPhone

 On Apr 15, 2015, at 1:51 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.com wrote:

 Hi Michael,

 There is code in kafka codebase that reads and interprets the topic
config JSON which has acls, owner and logconfig properties. There are 3
use
cases that we are supporting with current proposal:

   *   You use out of box simpleAcl authorizer which is tied to the acl
stored in topic config and the format is locked down.
   *   You have a custom authorizer and a custom ACL store.
Ranger/Argus
falls under this as they have their own acl store and ui that users use
to
configure acls on the cluster and cluster resources  like topic. It is
upto
the custom authorizer to leverage the kafka acl configs or completely
ignore them as they have set a user expectation that only acls configured
via their ui/system will be effective.
   *   You have a custom authorizer but no custom Acl store. You are
completely tied to Acl structure that we have provided in out of box
implementation.

 Thanks
 Parth

 On 4/15/15, 10:31 AM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
wrote:

 Hi Parth,

 One question that occurred to me at the end of today’s hangout: how
tied
 are we to a particular ACL representation under your proposal? I know
that
 TopicConfigCache will just contain JSON— if a particular site decides
they
 want to represent their ACLs differently, and swap out the authorizer
 implementation, will that work?  I guess what I’m asking is whether
 there’s any code in the Kafka codebase that will interpret that JSON,
or
 does that logic live exclusively in the authorizer?

 On 4/14/15, 10:56 PM, Don Bosco Durai
bo...@apache.orgmailto:bo...@apache.org wrote:

 I also feel, having just IP would be more appropriate. Host lookup will
 unnecessary slow things down and would be insecure as you pointed out.

 With IP, it will be also able to setup policies (in future if needed)
with
 ranges or netmasks and it would be more scalable.

 Bosco


 On 4/14/15, 1:40 PM, Michael Herstine
mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID
 wrote:

 Hi Parth,

 Sorry to chime in so late, but I’ve got a minor question on the KIP.

 Several methods take a parameter named “host” of type String. Is that
 intended to be a hostname, or an IP address? If the former, I’m curious
 as
 to how that’s found (in my experience, when accepting an incoming
socket
 connection, you only know the IP address, and there isn’t a way to map
 that to a hostname without a round trip to a DNS server, which is
 insecure
 anyway).


 On 3/25/15, 1:07 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
 wrote:

 Hi all,

 I have modified the KIP to reflect the recent change request from the
 reviewers. I have been working on the code and I have the server side
 code
 for authorization ready. I am now modifying the command line utilities.
 I
 would really appreciate if some of the committers can spend sometime to
 review the KIP so we can make progress on this.

 Thanks
 Parth

 On 3/18/15, 2:20 PM, Michael Herstine

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497015#comment-14497015
 ] 

Sriharsha Chintalapani commented on KAFKA-2125:
---

[~jjkoshy] [~jbrosenberg] Thanks for the logs. Can you bit more details on the 
cluster like how many topics and partitions you've . I won't be able to work on 
this for another couple of days but will  try get back to you asap with more 
details.

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - 

[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-15 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497035#comment-14497035
 ] 

Jason Rosenberg commented on KAFKA-2125:


This cluster has 4 nodes.  each node has 4 disks, mounted as JBOD.  about 3.5Tb 
total per node.
Almost all partitions have 2 replicas.  There are 560 topics, 16 of which have 
more than 1 partition.  The largest topic has 40 partitions.  Total of 854 
partitions.  Log retention is 24 hours in most cases, disk about 40% full.

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO 

Re: Will it be possible to apply quotas based on a security principal?

2015-04-15 Thread Tong Li


If it is all possible I think we can introduce request pipeline
architecture for producer, broker and consumer (does not have to be done
all at once), that way, security, quota, encryption, serialization,
compression all can be done as plugable components. One can freely stack
them up or tear them apart at the configuration level. Each component can
have its own design and implementation. Any deployed can choose what
interest him or her. Components have no dependency among each other. New
features can be easily added/tried/removed/abandoned. When I looked at our
code, instrument this so called pipeline can be very easy. Wonder if any of
you guys interested.

Thanks.

Sent from my iPhone

 On Apr 15, 2015, at 4:14 PM, Jay Kreps jay.kr...@gmail.com wrote:

 I think this should be a fairly minor follow-up item to have the quotas
key
 off of user rather than client id. The advantage of starting with
client.id
 is that it decouples the security work from the quota work in the short
 term and provides a mechanism for those using Kafka without
authentication
 to still enforce quotas.

 On Wed, Apr 15, 2015 at 6:15 AM, Adrian Preston prest...@uk.ibm.com
wrote:

  Hi,
 
  I've been investigating using Kafka for a multi-user system that
applies
  quotas at a per-user level.  Reading through KIP-13 and KAFKA-1682, I
  wondered: are there any plans to link together the security principal
and
  client identifier in some way?  Currently it appears these are separate
  concepts - so I can't see any way to apply a quota based on the
  authenticated identity of a user.
 
 
  Regards
  - Adrian
 
  Unless stated otherwise above:
  IBM United Kingdom Limited - Registered in England and Wales with
number
  741598.
  Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
3AU
 
 

[jira] [Updated] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments

2015-04-15 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-2118:
--
Attachment: KAFKA-2118_2015-04-15_09:43:51.patch

 Cleaner cannot clean after shutdown during replaceSegments
 --

 Key: KAFKA-2118
 URL: https://issues.apache.org/jira/browse/KAFKA-2118
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Gian Merlino
Assignee: Rajini Sivaram
 Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch


 If a broker shuts down after the cleaner calls replaceSegments with more than 
 one segment, the partition can be left in an uncleanable state. We saw this 
 on a few brokers after doing a rolling update. The sequence of things we saw 
 is:
 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 
 into a new segment 0.
 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 
 0,1094621529,1094831997 in log xxx-15. and called replaceSegments.
 3) 0.cleaned was renamed to 0.swap.
 4) Broker shut down before deleting segments 1094621529 and 1094831997.
 5) Broker started up and logged Found log file 
 /mnt/persistent/kafka-logs/xxx-15/.log.swap from 
 interrupted swap operation, repairing.
 6) Cleaner thread died with the exception 
 kafka.common.InvalidOffsetException: Attempt to append an offset 
 (1094911424) to position 1003 no larger than the last offset appended 
 (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.cleaned.
 I think what's happening in #6 is that when the broker started back up and 
 repaired the log, segment 0 ended up with a bunch of messages that were also 
 in segment 1094621529 and 1094831997 (because the new segment 0 was created 
 from cleaning all 3). But segments 1094621529 and 1094831997 were still on 
 disk, so offsets on disk were no longer monotonically increasing, violating 
 the assumption of OffsetIndex. We ended up fixing this by deleting segments 
 1094621529 and 1094831997 manually, and then removing the line for this 
 partition from the cleaner-offset-checkpoint file (otherwise it would 
 reference the non-existent segment 1094621529).
 This can happen even on a clean shutdown (the async deletes in 
 replaceSegments might not happen).
 Cleaner logs post-startup:
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15.
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15...
 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 
 segments in offset range [1094621529, 1095924157).
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete.
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones 
 prior to Sun Apr 12 14:05:37 UTC 2015)...
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last 
 modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes.
 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 
 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 
 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) 
 to position 1003 no larger than the last offset appended (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.
 cleaned.
 at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
 at kafka.log.LogSegment.append(LogSegment.scala:81)
 at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427)
 at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358)
 at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354)
 at 

[jira] [Commented] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments

2015-04-15 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495952#comment-14495952
 ] 

Rajini Sivaram commented on KAFKA-2118:
---

Have updated the patch with an unit test to check recovery process after broker 
crash at different stages of a clean and swap operation. Test fails with 
current Kafka code and works with the updated code in the patch. 

Have also done manual testing by recreating the scenario described in this 
report by forcing termination during replaceSegments(). Have tested that the 
failure no longer occurs with the attached patch with the same manual test.

 Cleaner cannot clean after shutdown during replaceSegments
 --

 Key: KAFKA-2118
 URL: https://issues.apache.org/jira/browse/KAFKA-2118
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Gian Merlino
Assignee: Rajini Sivaram
 Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch


 If a broker shuts down after the cleaner calls replaceSegments with more than 
 one segment, the partition can be left in an uncleanable state. We saw this 
 on a few brokers after doing a rolling update. The sequence of things we saw 
 is:
 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 
 into a new segment 0.
 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 
 0,1094621529,1094831997 in log xxx-15. and called replaceSegments.
 3) 0.cleaned was renamed to 0.swap.
 4) Broker shut down before deleting segments 1094621529 and 1094831997.
 5) Broker started up and logged Found log file 
 /mnt/persistent/kafka-logs/xxx-15/.log.swap from 
 interrupted swap operation, repairing.
 6) Cleaner thread died with the exception 
 kafka.common.InvalidOffsetException: Attempt to append an offset 
 (1094911424) to position 1003 no larger than the last offset appended 
 (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.cleaned.
 I think what's happening in #6 is that when the broker started back up and 
 repaired the log, segment 0 ended up with a bunch of messages that were also 
 in segment 1094621529 and 1094831997 (because the new segment 0 was created 
 from cleaning all 3). But segments 1094621529 and 1094831997 were still on 
 disk, so offsets on disk were no longer monotonically increasing, violating 
 the assumption of OffsetIndex. We ended up fixing this by deleting segments 
 1094621529 and 1094831997 manually, and then removing the line for this 
 partition from the cleaner-offset-checkpoint file (otherwise it would 
 reference the non-existent segment 1094621529).
 This can happen even on a clean shutdown (the async deletes in 
 replaceSegments might not happen).
 Cleaner logs post-startup:
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15.
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15...
 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 
 segments in offset range [1094621529, 1095924157).
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete.
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones 
 prior to Sun Apr 12 14:05:37 UTC 2015)...
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last 
 modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes.
 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 
 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 
 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) 
 to position 1003 no larger than the last offset appended (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.
 cleaned.
 at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at 

Re: Review Request 33168: Fix recovery of swap files after broker crash

2015-04-15 Thread Rajini Sivaram

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33168/
---

(Updated April 15, 2015, 9:44 a.m.)


Review request for kafka.


Bugs: KAFKA-2118
https://issues.apache.org/jira/browse/KAFKA-2118


Repository: kafka


Description
---

Patch for KAFKA-2118: Fix recovery of cleaned segments after broker crash


Diffs (updated)
-

  core/src/main/scala/kafka/log/Log.scala 
5563f2de8113a0ece8929bec9c75dbf892abbb66 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 
9792ed689033dbd4ad99809a4e566136d2b9fadf 

Diff: https://reviews.apache.org/r/33168/diff/


Testing
---


Thanks,

Rajini Sivaram



[jira] [Commented] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments

2015-04-15 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495943#comment-14495943
 ] 

Rajini Sivaram commented on KAFKA-2118:
---

Updated reviewboard https://reviews.apache.org/r/33168/diff/
 against branch origin/trunk

 Cleaner cannot clean after shutdown during replaceSegments
 --

 Key: KAFKA-2118
 URL: https://issues.apache.org/jira/browse/KAFKA-2118
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Gian Merlino
Assignee: Rajini Sivaram
 Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch


 If a broker shuts down after the cleaner calls replaceSegments with more than 
 one segment, the partition can be left in an uncleanable state. We saw this 
 on a few brokers after doing a rolling update. The sequence of things we saw 
 is:
 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 
 into a new segment 0.
 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 
 0,1094621529,1094831997 in log xxx-15. and called replaceSegments.
 3) 0.cleaned was renamed to 0.swap.
 4) Broker shut down before deleting segments 1094621529 and 1094831997.
 5) Broker started up and logged Found log file 
 /mnt/persistent/kafka-logs/xxx-15/.log.swap from 
 interrupted swap operation, repairing.
 6) Cleaner thread died with the exception 
 kafka.common.InvalidOffsetException: Attempt to append an offset 
 (1094911424) to position 1003 no larger than the last offset appended 
 (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.cleaned.
 I think what's happening in #6 is that when the broker started back up and 
 repaired the log, segment 0 ended up with a bunch of messages that were also 
 in segment 1094621529 and 1094831997 (because the new segment 0 was created 
 from cleaning all 3). But segments 1094621529 and 1094831997 were still on 
 disk, so offsets on disk were no longer monotonically increasing, violating 
 the assumption of OffsetIndex. We ended up fixing this by deleting segments 
 1094621529 and 1094831997 manually, and then removing the line for this 
 partition from the cleaner-offset-checkpoint file (otherwise it would 
 reference the non-existent segment 1094621529).
 This can happen even on a clean shutdown (the async deletes in 
 replaceSegments might not happen).
 Cleaner logs post-startup:
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15.
 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15...
 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 
 segments in offset range [1094621529, 1095924157).
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete.
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones 
 prior to Sun Apr 12 14:05:37 UTC 2015)...
 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last 
 modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes.
 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 
 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 
 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes.
 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] 
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) 
 to position 1003 no larger than the last offset appended (1095045873) to 
 /mnt/persistent/kafka-logs/xxx-15/.index.
 cleaned.
 at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
 at kafka.log.LogSegment.append(LogSegment.scala:81)
 at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427)
 at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358)
 at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354)
 at