[jira] [Created] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-24 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7548:
--

 Summary: KafkaConsumer should not throw away already fetched data 
for paused partitions.
 Key: KAFKA-7548
 URL: https://issues.apache.org/jira/browse/KAFKA-7548
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.41961 sec)|4884693
(60.42054 sec)|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7096:
--

 Summary: Consumer should drop the data for unassigned topic 
partitions
 Key: KAFKA-7096
 URL: https://issues.apache.org/jira/browse/KAFKA-7096
 Project: Kafka
  Issue Type: Improvement
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
client unassigns some topics (for example T3) and calls poll() we still hold 
the data (for T3) in the completedFetches queue until we actually reach the 
buffered data for the unassigned Topics (T3 in our example) on subsequent 
poll() calls, at which point we drop that data. This process of holding the 
data is unnecessary.

When a client creates a topic, it takes time for the broker to fetch ACLs for 
the topic. But during this time, the client will issue fetchRequest for the 
topic, it will get response for the partitions of this topic. The response 
consist of TopicAuthorizationException for each of the partitions. This 
response for each partition is wrapped with a completedFetch and added to the 
completedFetches queue. Now when the client calls the next poll() it sees the 
TopicAuthorizationException from the first buffered CompletedFetch. At this 
point the client chooses to sleep for 1.5 min as a backoff (as per the design), 
hoping that the Broker fetches the ACL from ACL store in the meantime. Actually 
the Broker has already fetched the ACL by this time. When the client calls 
poll() after the sleep, it again sees the TopicAuthorizationException from the 
second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) 
seconds before the client can see any data. With this patch, the client when it 
sees the first TopicAuthorizationException, it can all assign(EmptySet), which 
will get rid of the buffered completedFetches (those with 
TopicAuthorizationException) and it can again call assign(TopicPartitions) 
before calling poll(). With this patch we found that client was able to get the 
records as soon as the Broker fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2017-03-21 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935327#comment-15935327
 ] 

Mayuresh Gharat edited comment on KAFKA-4808 at 3/21/17 9:00 PM:
-

[~ijuma] Please find the KIP  here : 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-135+%3A+Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user.



was (Author: mgharat):
[~ijuma] Please find the KIP  here : 
https://cwiki.apache.org/confluence/display/KAFKA/Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user.


> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
> Fix For: 0.11.0.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2017-03-21 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935327#comment-15935327
 ] 

Mayuresh Gharat commented on KAFKA-4808:


[~ijuma] Please find the KIP  here : 
https://cwiki.apache.org/confluence/display/KAFKA/Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user.


> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
> Fix For: 0.11.0.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2017-02-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886365#comment-15886365
 ] 

Mayuresh Gharat commented on KAFKA-4808:


[~ijuma] Sure we can provide a better error message if we have a separate 
Error_Code. It looks like a subclass of InvalidRequestException as its indeed 
an invalid produce request for that topic.
I will work on the KIP.

Thanks,

Mayuresh

> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
> Fix For: 0.10.3.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2017-02-27 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-4808:
--

Assignee: Mayuresh Gharat

> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
> Fix For: 0.10.3.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2017-02-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886288#comment-15886288
 ] 

Mayuresh Gharat commented on KAFKA-4808:


[~ijuma] I was thinking if throwing "INVALID_REQUEST" exception should work 
here, or do we need to add a new Exception type (which would require a KIP)?

> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
> Fix For: 0.10.3.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] (KAFKA-1610) Local modifications to collections generated from mapValues will be lost

2017-01-31 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847355#comment-15847355
 ] 

Mayuresh Gharat commented on KAFKA-1610:


[~jozi-k] sure. The patch has been available but we somehow missed this.

> Local modifications to collections generated from mapValues will be lost
> 
>
> Key: KAFKA-1610
> URL: https://issues.apache.org/jira/browse/KAFKA-1610
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
> Attachments: KAFKA-1610_2014-08-29_09:51:51.patch, 
> KAFKA-1610_2014-08-29_10:03:55.patch, KAFKA-1610_2014-09-03_11:27:50.patch, 
> KAFKA-1610_2014-09-16_13:08:17.patch, KAFKA-1610_2014-09-16_15:23:27.patch, 
> KAFKA-1610_2014-09-30_23:21:46.patch, KAFKA-1610_2014-10-02_12:07:01.patch, 
> KAFKA-1610_2014-10-02_12:09:46.patch, KAFKA-1610.patch
>
>
> In our current Scala code base we have 40+ usages of mapValues, however it 
> has an important semantic difference with map, which is that "map" creates a 
> new map collection instance, while "mapValues" just create a map view of the 
> original map, and hence any further value changes to the view will be 
> effectively lost.
> Example code:
> {code}
> scala> case class Test(i: Int, var j: Int) {}
> defined class Test
> scala> val a = collection.mutable.Map(1 -> 1)
> a: scala.collection.mutable.Map[Int,Int] = Map(1 -> 1)
> scala> val b = a.mapValues(v => Test(v, v))
> b: scala.collection.Map[Int,Test] = Map(1 -> Test(1,1))
> scala> val c = a.map(v => v._1 -> Test(v._2, v._2))
> c: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,1))
> scala> b.foreach(kv => kv._2.j = kv._2.j + 1)
> scala> b
> res1: scala.collection.Map[Int,Test] = Map(1 -> Test(1,1))
> scala> c.foreach(kv => kv._2.j = kv._2.j + 1)
> scala> c
> res3: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,2))
> scala> a.put(1,3)
> res4: Option[Int] = Some(1)
> scala> b
> res5: scala.collection.Map[Int,Test] = Map(1 -> Test(3,3))
> scala> c
> res6: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,2))
> {code}
> We need to go through all these mapValue to see if they should be changed to 
> map



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-07 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730371#comment-15730371
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~junrao], thanks a lot for the review.
The thought of adding an equality check for channelPrincipal(if it exist) did 
cross my mind, but I left it out purposely. The reason was, I thought that 
Kafka internally, mainly cares about the principal name and principal type and 
the principal name actually comes from the channelPrincipal. But now that I 
think more about it, the channelPrincipal might be different custom types like 
servicePrincipal or userPrincipal and so on, with same name and so it does make 
sense to add proper equality check there.

I agree that there might be a better way of doing this. I will write up a KIP 
for this and submit for review soon. 

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-02 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114
 ] 

Mayuresh Gharat edited comment on KAFKA-4454 at 12/2/16 8:13 PM:
-

[~ijuma] To make progress on this let me upload a PR for this and then if we 
want to include SimplePrincipal, I will be happy to rebase the patch you 
mentioned above. Does that work ? :)


was (Author: mgharat):
[~ijuma] I might not be understanding "accentuates the differences between 
authentication and authorization." completely.
Let me upload a PR for this and then if we want to include SimplePrincipal, I 
will be happy to rebase the patch you mentioned here. Does that work ? :)

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-02 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114
 ] 

Mayuresh Gharat edited comment on KAFKA-4454 at 12/2/16 8:00 PM:
-

[~ijuma] I might not be understanding "accentuates the differences between 
authentication and authorization." completely.
Let me upload a PR for this and then if we want to include SimplePrincipal, I 
will be happy to rebase the patch you mentioned here. Does that work ? :)


was (Author: mgharat):
[~ijuma] I might not be understanding "accentuates the differences between 
authentication and authorization." completely.
Let me upload a PR for this and then if we want to include SimplePrincipal, I 
will be happy to rebase the patch you mentioned here. Does that work ? :)

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-02 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~ijuma] I might not be understanding "accentuates the differences between 
authentication and authorization." completely.
Let me upload a PR for this and then if we want to include SimplePrincipal, I 
will be happy to rebase the patch you mentioned here. Does that work ? :)

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-01 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713402#comment-15713402
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~ijuma] Thanks for pointing me to the patch. The idea of using SimplePrincipal 
looks goods OR we can add an another constructor that takes in an additional 
parameter "channelPrincipal" of type Java.Principal. 
 
The main change will be required in SocketServer line : 
   val session = 
RequestChannel.Session(KafkaPrincipal.fromPrincipal(channel.principal), 
channel.socketAddress)
and change it to :
   val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName, 
channel.principal())
where channel.principal() is going to return the Principal generated by the 
PrincipalBuilder.

Regarding "Do you have some examples of fields that you would want your 
principal to pass?"
---> Our Authorizer implementation delegates to Linkedin's security infra 
team's library that creates a Java.Principal with some additional information 
form the provided client cert. This information is required by their ACL 
service to ALLOW or DENY operations. 
This is likely to be a common use case for most of the companies, that have 
custom ACL service of their own.


> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-11-30 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709335#comment-15709335
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~jjkoshy] [~ashishsinghdev] [~parth.brahmbhatt] [~ijuma] ping :)

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-11-28 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703758#comment-15703758
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~ijuma] [~jjkoshy] [~ashishsinghdev] [~parth.brahmbhatt] would you mind taking 
a look at this? 
I will be happy to submit a PR for this.

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-11-28 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-4454:
--

 Summary: Authorizer should also include the Principal generated by 
the PrincipalBuilder.
 Key: KAFKA-4454
 URL: https://issues.apache.org/jira/browse/KAFKA-4454
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
 Fix For: 0.10.2.0


Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
Authorizer.
The Authorizer.authorize() object takes in a Session object that wraps 
KafkaPrincipal and InetAddress.
The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
the name of Principal generated by the PrincipalBuilder. 
This Principal, generated by the pluggedin PrincipalBuilder might have other 
fields that might be required by the pluggedin Authorizer but currently we 
loose this information since we only extract the name of Principal while 
creating KaflkaPrincipal in SocketServer.  

It would be great if KafkaPrincipal has an additional field "channelPrincipal" 
which is used to store the Principal generated by the plugged in 
PrincipalBuilder.

The pluggedin Authorizer can then use this "channelPrincipal" to do 
authorization.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4433) Kafka Controller Does not send a LeaderAndIsr to old leader of a topicPartition during reassignment, if the old leader is not a part of the new assigned replicas

2016-11-22 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-4433:
--

 Summary: Kafka Controller Does not send a LeaderAndIsr to old 
leader of a topicPartition during reassignment, if the old leader is not a part 
of the new assigned replicas
 Key: KAFKA-4433
 URL: https://issues.apache.org/jira/browse/KAFKA-4433
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
 Fix For: 0.10.2.0


Consider the following scenario :
old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], 
Leader = 2} 
In this case broker 1 does not receive a LeaderAndIsr request to become a 
follower.

This happens because of the following :
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, 
currentLeaderAndIsr) in PartitionStateMachine.electLeaderForPartition(...) , 
the replicas returned by ReassignedPartitionLeaderSelector.selectLeader() is 
only the new Replicas, which are then sent the LeaderAndIsrRequest. So the old 
replica never receives this LeaderAndIsr.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-08 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649485#comment-15649485
 ] 

Mayuresh Gharat commented on KAFKA-4362:


I did some more testing while reproducing these error scenarios.
There are 2 bugs :
1) OFFSET COMMITS :
 When we check for MessageFormatVersion, we actually first check if the replica 
is local for the __consumer_offsets topic and if its not, we return an 
illegalArgumentException. This results in an Unknown Exception on the client 
side and commitOffset operation gets an exception. 
Also as a side effect of this, if you start another consumer in the same group 
for consuming from the same topic after the rebalance is done and start 
producing to the topic, you will see that both consumers are consuming the same 
data. This is because the second consumer that you have started is talking to 
the right coordinator and the first consumer is completely unaware of the 
presence of second consumer.

2) CONSUMER REBALANCE :
While doing topic reassignment for example moving replicas from (1,2,3 [Leader 
: 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends 
stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling 
this request on server side, we never get rid of the particular partition of 
__consumer_offsets topic from the coordinators (broker 1) cache. When a 
handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1) 
actually has a check if the group is local. But since we have not removed the 
group from its cache on the earlier stopRepicaRequest from the controller, it 
does not return NotCoordinatorForGroupException but proceeds with success. So 
the consumer thinks that its talking to the right coordinator (which is not the 
case since we moved the coordinator to broker 4 from broker 1). On the consumer 
side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest 
to broker 1, which in turn calls the code for checking the MessageFormatVersion 
on the server. At this point it throws an illegalArgumentException for same 
reason expalined in point 1) above. This causes the syncGroupRequest to fail 
with unknown exception in the SyncGroupResponseHandler callback.

The exact steps for reproducing these scenarios are as follows :
For 1)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer side, something like this 
:

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you still produce to the topic, you should be able to see data in the 
console of this consumer, but its not able to commit offsets.
i) Now if you start another console consumer with same groupId = testGroupA 
consuming from the same topic testtopicA and if you produce more data, you 
should be able to see the data in both the consumer consoles.


For 2)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
c) Start another console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data. Exactly one of them should be consuming the 
data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer actually fetching data, 
something like this :

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you kill this consumer, you should immediately see an exception on the 
other consumer console, something like this : 

[2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
server experienced an unexpected error when processing the request
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518)
at 
org.apache.kafk

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644945#comment-15644945
 ] 

Mayuresh Gharat commented on KAFKA-4362:


[~jasong35] form the details that Joel has listed, I think there are 2 issues :
1) Offsets commit fail when the Offsets topic partition is moved. This happens 
because the old coordinator incorrectly returns an iilegalArgumentException 
when checking for the MessageVersion format, when its infact checking first if 
the replica is local. So the correct way here would be to return 
"NotCoordinatorForGroupException" from server side.
2) On client side, right not due to illegalArgumentException thrown by server 
which is bubbled as UnknownException, the consumer is not able to handle it 
correctly. 

I think once we return the correct (NotCoordinatorForGroupException) exception, 
the consumer should be able to handle it and proceed. 

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config

2016-10-19 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589938#comment-15589938
 ] 

Mayuresh Gharat commented on KAFKA-3995:


If we disable compression, we would not have this issue right? Of course that's 
not recommended.
The other way would be to reduce the linger.ms to be very very low.

On thinking more about this I plan to reopen and work on this. Since this is a 
new config, we would probably require a KIP for this right? If Yes, I can write 
up a KIP and submit for review.

> Add a new configuration "enable.comrpession.ratio.estimation" to the producer 
> config
> 
>
> Key: KAFKA-3995
> URL: https://issues.apache.org/jira/browse/KAFKA-3995
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> We recently see a few cases where RecordTooLargeException is thrown because 
> the compressed message sent by KafkaProducer exceeded the max message size.
> The root cause of this issue is because the compressor is estimating the 
> batch size using an estimated compression ratio based on heuristic 
> compression ratio statistics. This does not quite work for the traffic with 
> highly variable compression ratios. 
> For example, if the batch size is set to 1MB and the max message size is 1MB. 
> Initially a the producer is sending messages (each message is 1MB) to topic_1 
> whose data can be compressed to 1/10 of the original size. After a while the 
> estimated compression ratio in the compressor will be trained to 1/10 and the 
> producer would put 10 messages into one batch. Now the producer starts to 
> send messages (each message is also 1MB) to topic_2 whose message can only be 
> compress to 1/5 of the original size. The producer would still use 1/10 as 
> the estimated compression ratio and put 10 messages into a batch. That batch 
> would be 2 MB after compression which exceeds the maximum message size. In 
> this case the user do not have many options other than resend everything or 
> close the producer if they care about ordering.
> This is especially an issue for services like MirrorMaker whose producer is 
> shared by many different topics.
> To solve this issue, we can probably add a configuration 
> "enable.compression.ratio.estimation" to the producer. So when this 
> configuration is set to false, we stop estimating the compressed size but 
> will close the batch once the uncompressed bytes in the batch reaches the 
> batch size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config

2016-10-19 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reopened KAFKA-3995:


> Add a new configuration "enable.comrpession.ratio.estimation" to the producer 
> config
> 
>
> Key: KAFKA-3995
> URL: https://issues.apache.org/jira/browse/KAFKA-3995
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> We recently see a few cases where RecordTooLargeException is thrown because 
> the compressed message sent by KafkaProducer exceeded the max message size.
> The root cause of this issue is because the compressor is estimating the 
> batch size using an estimated compression ratio based on heuristic 
> compression ratio statistics. This does not quite work for the traffic with 
> highly variable compression ratios. 
> For example, if the batch size is set to 1MB and the max message size is 1MB. 
> Initially a the producer is sending messages (each message is 1MB) to topic_1 
> whose data can be compressed to 1/10 of the original size. After a while the 
> estimated compression ratio in the compressor will be trained to 1/10 and the 
> producer would put 10 messages into one batch. Now the producer starts to 
> send messages (each message is also 1MB) to topic_2 whose message can only be 
> compress to 1/5 of the original size. The producer would still use 1/10 as 
> the estimated compression ratio and put 10 messages into a batch. That batch 
> would be 2 MB after compression which exceeds the maximum message size. In 
> this case the user do not have many options other than resend everything or 
> close the producer if they care about ordering.
> This is especially an issue for services like MirrorMaker whose producer is 
> shared by many different topics.
> To solve this issue, we can probably add a configuration 
> "enable.compression.ratio.estimation" to the producer. So when this 
> configuration is set to false, we stop estimating the compressed size but 
> will close the batch once the uncompressed bytes in the batch reaches the 
> batch size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config

2016-10-18 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat resolved KAFKA-3995.

Resolution: Workaround

> Add a new configuration "enable.comrpession.ratio.estimation" to the producer 
> config
> 
>
> Key: KAFKA-3995
> URL: https://issues.apache.org/jira/browse/KAFKA-3995
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> We recently see a few cases where RecordTooLargeException is thrown because 
> the compressed message sent by KafkaProducer exceeded the max message size.
> The root cause of this issue is because the compressor is estimating the 
> batch size using an estimated compression ratio based on heuristic 
> compression ratio statistics. This does not quite work for the traffic with 
> highly variable compression ratios. 
> For example, if the batch size is set to 1MB and the max message size is 1MB. 
> Initially a the producer is sending messages (each message is 1MB) to topic_1 
> whose data can be compressed to 1/10 of the original size. After a while the 
> estimated compression ratio in the compressor will be trained to 1/10 and the 
> producer would put 10 messages into one batch. Now the producer starts to 
> send messages (each message is also 1MB) to topic_2 whose message can only be 
> compress to 1/5 of the original size. The producer would still use 1/10 as 
> the estimated compression ratio and put 10 messages into a batch. That batch 
> would be 2 MB after compression which exceeds the maximum message size. In 
> this case the user do not have many options other than resend everything or 
> close the producer if they care about ordering.
> This is especially an issue for services like MirrorMaker whose producer is 
> shared by many different topics.
> To solve this issue, we can probably add a configuration 
> "enable.compression.ratio.estimation" to the producer. So when this 
> configuration is set to false, we stop estimating the compressed size but 
> will close the batch once the uncompressed bytes in the batch reaches the 
> batch size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4074) Deleting a topic can make it unavailable even if delete.topic.enable is false

2016-09-14 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491390#comment-15491390
 ] 

Mayuresh Gharat commented on KAFKA-4074:


[~omkreddy] merging your PR with that in 
https://issues.apache.org/jira/browse/KAFKA-3175 would be great. 


> Deleting a topic can make it unavailable even if delete.topic.enable is false
> -
>
> Key: KAFKA-4074
> URL: https://issues.apache.org/jira/browse/KAFKA-4074
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Joel Koshy
>Assignee: Manikumar Reddy
> Fix For: 0.10.1.0
>
>
> The {{delete.topic.enable}} configuration does not completely block the 
> effects of delete topic since the controller may (indirectly) query the list 
> of topics under the delete-topic znode.
> To reproduce:
> * Delete topic X
> * Force a controller move (either by bouncing or removing the controller 
> znode)
> * The new controller will send out UpdateMetadataRequests with leader=-2 for 
> the partitions of X
> * Producers eventually stop producing to that topic
> The reason for this is that when ControllerChannelManager adds 
> UpdateMetadataRequests for brokers, we directly use the partitionsToBeDeleted 
> field of the DeleteTopicManager (which is set to the partitions of the topics 
> under the delete-topic znode on controller startup).
> In order to get out of the situation you have to remove X from the znode and 
> then force another controller move.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-08-25 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437288#comment-15437288
 ] 

Mayuresh Gharat commented on KAFKA-3083:


[~fpj] do we have an umbrella jira where this issue is been tracked with the 
changes required to be made that are mentioned in this patch?

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka

2016-08-23 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433359#comment-15433359
 ] 

Mayuresh Gharat edited comment on KAFKA-2014 at 8/23/16 6:37 PM:
-

This is the failure inducer that was developed for kafka :
https://github.com/linkedin/simoorg


was (Author: mgharat):
https://github.com/linkedin/simoorg

> Chaos Monkey / Failure Inducer for Kafka
> 
>
> Key: KAFKA-2014
> URL: https://issues.apache.org/jira/browse/KAFKA-2014
> Project: Kafka
>  Issue Type: Task
>  Components: system tests
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Implement a Chaos Monkey for kafka, that will help us catch any shortcomings 
> in the test environment before going to production. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka

2016-08-23 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433359#comment-15433359
 ] 

Mayuresh Gharat commented on KAFKA-2014:


https://github.com/linkedin/simoorg

> Chaos Monkey / Failure Inducer for Kafka
> 
>
> Key: KAFKA-2014
> URL: https://issues.apache.org/jira/browse/KAFKA-2014
> Project: Kafka
>  Issue Type: Task
>  Components: system tests
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Implement a Chaos Monkey for kafka, that will help us catch any shortcomings 
> in the test environment before going to production. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka

2016-08-23 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat resolved KAFKA-2014.

Resolution: Fixed

> Chaos Monkey / Failure Inducer for Kafka
> 
>
> Key: KAFKA-2014
> URL: https://issues.apache.org/jira/browse/KAFKA-2014
> Project: Kafka
>  Issue Type: Task
>  Components: system tests
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Implement a Chaos Monkey for kafka, that will help us catch any shortcomings 
> in the test environment before going to production. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2976) Mirror maker dies if we delete a topic from destination cluster

2016-08-23 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat resolved KAFKA-2976.

Resolution: Won't Fix

> Mirror maker dies if we delete a topic from destination cluster
> ---
>
> Key: KAFKA-2976
> URL: https://issues.apache.org/jira/browse/KAFKA-2976
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> In datapipeline,
> 1) Suppose the  Mirror Maker is producing to a cluster with Topic T and has 
> 128 partitions (Partition 0 to Partition 127) . The default setting on 
> creation of a new topic on that cluster is 8 partitions.
> 2) After we delete the topic, the topic gets recreated with 8 partitions 
> (Partition 0 to Partition 7).
> 3) The RecordAccumulator has batches for partitions from 9 to 127. Those 
> batches get expired and the mirror makers will die to avoid data loss.
> We need a way to reassign those batches (batches for Partition 9 top 
> Partition 127) in the RecordAccumulator to the newly created Topic T with 8 
> partitions (Partition 0 to Partition 7).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder

2016-08-23 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3722:
---
Status: Patch Available  (was: In Progress)

> PlaintextChannelBuilder should not use 
> ChannelBuilders.createPrincipalBuilder(configs) for creating instance of 
> PrincipalBuilder
> 
>
> Key: KAFKA-3722
> URL: https://issues.apache.org/jira/browse/KAFKA-3722
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider this scenario :
> 1) We have a Kafka Broker running on  PlainText and SSL port simultaneously.
> 2)  We try to plugin a custom principal builder using the config 
> "principal.builder.class" for the request coming over the SSL port.
> 3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a 
> config "principal.builder.class" is specified in the passed in configs and 
> tries to use that even when it is building the instance of PrincipalBuilder 
> for the PlainText port, when that custom principal class is only menat for 
> SSL port.
> IMO, having a DefaultPrincipalBuilder for PalinText port should be fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder

2016-08-23 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3722 started by Mayuresh Gharat.
--
> PlaintextChannelBuilder should not use 
> ChannelBuilders.createPrincipalBuilder(configs) for creating instance of 
> PrincipalBuilder
> 
>
> Key: KAFKA-3722
> URL: https://issues.apache.org/jira/browse/KAFKA-3722
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider this scenario :
> 1) We have a Kafka Broker running on  PlainText and SSL port simultaneously.
> 2)  We try to plugin a custom principal builder using the config 
> "principal.builder.class" for the request coming over the SSL port.
> 3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a 
> config "principal.builder.class" is specified in the passed in configs and 
> tries to use that even when it is building the instance of PrincipalBuilder 
> for the PlainText port, when that custom principal class is only menat for 
> SSL port.
> IMO, having a DefaultPrincipalBuilder for PalinText port should be fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-17 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424982#comment-15424982
 ] 

Mayuresh Gharat commented on KAFKA-4050:


Just a heads up, this has also been seen as an issue in other systems, for 
example :
https://issues.jenkins-ci.org/browse/JENKINS-20108



> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config

2016-07-26 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3995:
--

Assignee: Mayuresh Gharat

> Add a new configuration "enable.comrpession.ratio.estimation" to the producer 
> config
> 
>
> Key: KAFKA-3995
> URL: https://issues.apache.org/jira/browse/KAFKA-3995
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Mayuresh Gharat
> Fix For: 0.10.1.0
>
>
> We recently see a few cases where RecordTooLargeException is thrown because 
> the compressed message sent by KafkaProducer exceeded the max message size.
> The root cause of this issue is because the compressor is estimating the 
> batch size using an estimated compression ratio based on heuristic 
> compression ratio statistics. This does not quite work for the traffic with 
> highly variable compression ratios. 
> For example, if the batch size is set to 100KB and the max message size is 
> 1MB. Initially a the producer is sending messages (each message is 100KB) to 
> topic_1 whose data can be compressed to 1/10 of the original size. After a 
> while the estimated compression ratio in the compressor will be trained to 
> 1/10 and the producer would put 10 messages into one batch. Now the producer 
> starts to send messages (each message is also 100KB) to topic_2 whose message 
> can only be compress to 1/5 of the original size. The producer would still 
> use 1/10 as the estimated compression ratio and put 10 messages into a batch. 
> That batch would be 2 MB after compression which exceeds the maximum message 
> size. In this case the user do not have many options other than resend 
> everything or close the producer if they care about ordering.
> This is especially an issue for services like MirrorMaker whose producer is 
> shared by many different topics.
> To solve this issue, we can probably add a configuration 
> "enable.compression.ratio.estimation" to the producer. So when this 
> configuration is set to false, we stop estimating the compressed size but 
> will close the batch once the uncompressed bytes in the batch reaches the 
> batch size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder

2016-05-17 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3722:
--

 Summary: PlaintextChannelBuilder should not use 
ChannelBuilders.createPrincipalBuilder(configs) for creating instance of 
PrincipalBuilder
 Key: KAFKA-3722
 URL: https://issues.apache.org/jira/browse/KAFKA-3722
 Project: Kafka
  Issue Type: Bug
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


Consider this scenario :
1) We have a Kafka Broker running on  PlainText and SSL port simultaneously.

2)  We try to plugin a custom principal builder using the config 
"principal.builder.class" for the request coming over the SSL port.

3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a config 
"principal.builder.class" is specified in the passed in configs and tries to 
use that even when it is building the instance of PrincipalBuilder for the 
PlainText port, when that custom principal class is only menat for SSL port.

IMO, having a DefaultPrincipalBuilder for PalinText port should be fine.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3720) Remove BufferExhaustException from doSend() in KafkaProducer

2016-05-17 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3720:
--

 Summary: Remove BufferExhaustException from doSend() in 
KafkaProducer
 Key: KAFKA-3720
 URL: https://issues.apache.org/jira/browse/KAFKA-3720
 Project: Kafka
  Issue Type: Bug
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


KafkaProducer no longer throws BufferExhaustException. We should remove it from 
the catch clause. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the wait

2016-05-03 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269937#comment-15269937
 ] 

Mayuresh Gharat commented on KAFKA-3651:


[~amandazhu19620...@gmail.com] did you intend to open a PR against this jira?

> Whenever the BufferPool throws a "Failed to allocate memory within the 
> configured max blocking time" exception, it should also remove the condition 
> object from the waiters deque
> -
>
> Key: KAFKA-3651
> URL: https://issues.apache.org/jira/browse/KAFKA-3651
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.0.0
>
>
> "this.waiters.remove(moreMemory);" should happen before the exception
> is thrown.
> .Otherwise the waiting thread count will never get to 0 after the exception
> and batching will not occur. This is because in the RecordAccumulator.ready
> method the exhausted is set as
> boolean exhausted = this.free.queued() > 0 where free.queued() returns the
> waiters.size().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the wait

2016-05-03 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269837#comment-15269837
 ] 

Mayuresh Gharat commented on KAFKA-3651:


[~ijuma] Yes just working on the unit test.

> Whenever the BufferPool throws a "Failed to allocate memory within the 
> configured max blocking time" exception, it should also remove the condition 
> object from the waiters deque
> -
>
> Key: KAFKA-3651
> URL: https://issues.apache.org/jira/browse/KAFKA-3651
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> "this.waiters.remove(moreMemory);" should happen before the exception
> is thrown.
> .Otherwise the waiting thread count will never get to 0 after the exception
> and batching will not occur. This is because in the RecordAccumulator.ready
> method the exhausted is set as
> boolean exhausted = this.free.queued() > 0 where free.queued() returns the
> waiters.size().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" excepion, it should also remove the condition object from the waiters

2016-05-03 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3651:
--

 Summary: Whenever the BufferPool throws a "Failed to allocate 
memory within the configured max blocking time" excepion, it should also remove 
the condition object from the waiters deque
 Key: KAFKA-3651
 URL: https://issues.apache.org/jira/browse/KAFKA-3651
 Project: Kafka
  Issue Type: Bug
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


"this.waiters.remove(moreMemory);" should happen before the exception
is thrown.

.Otherwise the waiting thread count will never get to 0 after the exception
and batching will not occur. This is because in the RecordAccumulator.ready
method the exhausted is set as

boolean exhausted = this.free.queued() > 0 where free.queued() returns the
waiters.size().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3294) Kafka REST API

2016-04-26 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258891#comment-15258891
 ] 

Mayuresh Gharat commented on KAFKA-3294:


[~sriharsha.sm] so this will be integrated with kafka brokers right? So I am 
trying to understand is that this will be something like mirror maker?

> Kafka REST API
> --
>
> Key: KAFKA-3294
> URL: https://issues.apache.org/jira/browse/KAFKA-3294
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Sriharsha Chintalapani
>Assignee: Parth Brahmbhatt
>
> This JIRA is to build Kafka REST API for producer, consumer and also any 
> administrative tasks such as create topic, delete topic. We do have lot of 
> kafka client api support in different languages but having REST API for 
> producer and consumer will make it easier for users to read or write Kafka. 
> Also having administrative API will help in managing a cluster or building 
> administrative dashboards.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3294) Kafka REST API

2016-04-26 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258438#comment-15258438
 ] 

Mayuresh Gharat commented on KAFKA-3294:


Are you thinking of exposing a rest endpoint on the broker itself?


> Kafka REST API
> --
>
> Key: KAFKA-3294
> URL: https://issues.apache.org/jira/browse/KAFKA-3294
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Sriharsha Chintalapani
>Assignee: Parth Brahmbhatt
>
> This JIRA is to build Kafka REST API for producer, consumer and also any 
> administrative tasks such as create topic, delete topic. We do have lot of 
> kafka client api support in different languages but having REST API for 
> producer and consumer will make it easier for users to read or write Kafka. 
> Also having administrative API will help in managing a cluster or building 
> administrative dashboards.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition

2016-03-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195593#comment-15195593
 ] 

Mayuresh Gharat commented on KAFKA-3390:


Do you mean that even after the topic got completely removed the replicaManager 
on the leader broker kept on trying to shrink the ISR. Am I understanding it 
correctly?

-Mayuresh

> ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition
> -
>
> Key: KAFKA-3390
> URL: https://issues.apache.org/jira/browse/KAFKA-3390
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Stevo Slavic
>Assignee: Mayuresh Gharat
>
> For a topic whose deletion has been requested, Kafka replica manager may end 
> up infinitely trying and failing to shrink ISR.
> Here is fragment from server.log where this recurring and never ending 
> condition has been noticed:
> {noformat}
> [2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:13,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:23,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:33,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> ...
> {noformat}
> Before topic deletion was requested, this was state in ZK of its sole 
> partition:
> {noformat}
> Zxid: 0x181045
> Cxid: 0xc92
> Client id:0x3532dd88fd2
> Time: Mon Feb 29 16:46:23 CET 2016
> Operation:setData
> Path: /brokers/topics/foo/partitions/0/state
> Data: 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]}
> Version:  68
> {noformat}
> Topic (sole partition) had no data ever published to it. I guess at some 
> point after topic deletion has been requested, partition state first got 
> updated and this was updated state:
> {noformat}
> Zxid: 0x18b0be
> Cxid: 0x141e4
> Client id:0x3532dd88fd2
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:setData
> Path: /brokers/topics/foo/partitions/0/state
> Data: 
> {"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]}
> Version:  69
> {noformat}
> For whatever reason replica manager (some cache it uses, I guess 
> ReplicaManager.allPartitions) never sees this update, nor does it see that 
> the partition state, partition, partitions node and finally topic node got 
> deleted:
> {noformat}
> Zxid: 0x18b0bf
> Cxid: 0x40fb
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:delete
> Path: /brokers/topics/foo/partitions/0/state
> ---
> Zxid: 0x18b0c0
> Cxid: 0x40fe
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:delete
> Path: /brokers/topics/foo/partitions/0
> ---
> Zxid: 0x18b0c1
> Cxid: 0x4100
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016

[jira] [Assigned] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition

2016-03-14 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3390:
--

Assignee: Mayuresh Gharat

> ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition
> -
>
> Key: KAFKA-3390
> URL: https://issues.apache.org/jira/browse/KAFKA-3390
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Stevo Slavic
>Assignee: Mayuresh Gharat
>
> For a topic whose deletion has been requested, Kafka replica manager may end 
> up infinitely trying and failing to shrink ISR.
> Here is fragment from server.log where this recurring and never ending 
> condition has been noticed:
> {noformat}
> [2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:13,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:23,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR 
> for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
> [2016-03-04 09:42:33,897] WARN Conditional update of path 
> /brokers/topics/foo/partitions/0/state with data 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} 
> and expected version 68 failed due to 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
> [2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached 
> zkVersion [68] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> ...
> {noformat}
> Before topic deletion was requested, this was state in ZK of its sole 
> partition:
> {noformat}
> Zxid: 0x181045
> Cxid: 0xc92
> Client id:0x3532dd88fd2
> Time: Mon Feb 29 16:46:23 CET 2016
> Operation:setData
> Path: /brokers/topics/foo/partitions/0/state
> Data: 
> {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]}
> Version:  68
> {noformat}
> Topic (sole partition) had no data ever published to it. I guess at some 
> point after topic deletion has been requested, partition state first got 
> updated and this was updated state:
> {noformat}
> Zxid: 0x18b0be
> Cxid: 0x141e4
> Client id:0x3532dd88fd2
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:setData
> Path: /brokers/topics/foo/partitions/0/state
> Data: 
> {"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]}
> Version:  69
> {noformat}
> For whatever reason replica manager (some cache it uses, I guess 
> ReplicaManager.allPartitions) never sees this update, nor does it see that 
> the partition state, partition, partitions node and finally topic node got 
> deleted:
> {noformat}
> Zxid: 0x18b0bf
> Cxid: 0x40fb
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:delete
> Path: /brokers/topics/foo/partitions/0/state
> ---
> Zxid: 0x18b0c0
> Cxid: 0x40fe
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:delete
> Path: /brokers/topics/foo/partitions/0
> ---
> Zxid: 0x18b0c1
> Cxid: 0x4100
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2016
> Operation:delete
> Path: /brokers/topics/foo/partitions
> ---
> Zxid: 0x18b0c2
> Cxid: 0x4102
> Client id:0x3532dd88fd2000a
> Time: Fri Mar 04 9:41:52 CET 2

[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-14 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193448#comment-15193448
 ] 

Mayuresh Gharat commented on KAFKA-3388:


If the batch is retried and re-enqueued, the last append time gets updated and 
that is used to time out the batch. So it should not expire immediately.

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka

2016-03-10 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15190206#comment-15190206
 ] 

Mayuresh Gharat commented on KAFKA-3367:


The client that are producing to the kafka clusters should be stopped before 
you delete the topic. This is because when the topic gets deleted, the producer 
is going to issue TopicMetadata request and since the automatic topic creation 
is turned ON, it will recreate the topic.

> Delete topic dont delete the complete log from kafka
> 
>
> Key: KAFKA-3367
> URL: https://issues.apache.org/jira/browse/KAFKA-3367
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Akshath Patkar
>
> Delete topic Just marks the topic as deleted. But data still remain in logs.
> How can we delete the topic completely with out doing manual delete of logs 
> from kafka and zookeeper



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka

2016-03-10 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15189933#comment-15189933
 ] 

Mayuresh Gharat commented on KAFKA-3367:


Is automatic topic creation turned ON in your kafka cluster?
If yes, did you stop the clients that are consuming/producing to this kafka 
cluster before you marked the topic for delete in zookeeper?

> Delete topic dont delete the complete log from kafka
> 
>
> Key: KAFKA-3367
> URL: https://issues.apache.org/jira/browse/KAFKA-3367
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Akshath Patkar
>
> Delete topic Just marks the topic as deleted. But data still remain in logs.
> How can we delete the topic completely with out doing manual delete of logs 
> from kafka and zookeeper



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka

2016-03-10 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15189676#comment-15189676
 ] 

Mayuresh Gharat commented on KAFKA-3367:


It takes time to delete the logs, depending on the amount of data the topic 
has. Also when the topic is marked for deletion, the controller has a listener 
that fires and starts deleting the topic. We have tested this in our 
environment at Linkedin and it does delete the logs. 
What version of Kafka are you running?

> Delete topic dont delete the complete log from kafka
> 
>
> Key: KAFKA-3367
> URL: https://issues.apache.org/jira/browse/KAFKA-3367
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Akshath Patkar
>
> Delete topic Just marks the topic as deleted. But data still remain in logs.
> How can we delete the topic completely with out doing manual delete of logs 
> from kafka and zookeeper



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-02-01 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3175 started by Mayuresh Gharat.
--
> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0.1
>
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-02-01 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3175:
---
Status: Patch Available  (was: In Progress)

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0.1
>
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-02-01 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126711#comment-15126711
 ] 

Mayuresh Gharat commented on KAFKA-3175:


I am thinking that once we detect that the delete topic is disabled, 
onController restart/shift we can remove the entries under /admin/delete_topic/.

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0.1
>
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3181) Check occurrences of Runtime.halt() in the codebase and try to handle it at a single place

2016-02-01 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3181:
---
Description: Currently runtime.halt() is called at multiple places in the 
code. In an ideal case, we would throw an appropriate exception from classes 
and halt the runtime in a centralised place somewhere (so that we could 
override the behaviour during tests). For example : Right now test suit would 
just die halfway due to a halt call in ReplicaManager which, in turn, was due 
to an exception being thrown from this class 
(https://issues.apache.org/jira/browse/KAFKA-3063).

> Check occurrences of Runtime.halt() in the codebase and try to handle it at a 
> single place
> --
>
> Key: KAFKA-3181
> URL: https://issues.apache.org/jira/browse/KAFKA-3181
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>
> Currently runtime.halt() is called at multiple places in the code. In an 
> ideal case, we would throw an appropriate exception from classes and halt the 
> runtime in a centralised place somewhere (so that we could override the 
> behaviour during tests). For example : Right now test suit would just die 
> halfway due to a halt call in ReplicaManager which, in turn, was due to an 
> exception being thrown from this class 
> (https://issues.apache.org/jira/browse/KAFKA-3063).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3181) Check occurrences of Runtime.halt() in the codebase and try to handle it at a single place

2016-02-01 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3181:
--

 Summary: Check occurrences of Runtime.halt() in the codebase and 
try to handle it at a single place
 Key: KAFKA-3181
 URL: https://issues.apache.org/jira/browse/KAFKA-3181
 Project: Kafka
  Issue Type: Bug
Reporter: Mayuresh Gharat






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-01-29 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124681#comment-15124681
 ] 

Mayuresh Gharat edited comment on KAFKA-3175 at 1/30/16 3:05 AM:
-

I am just thinking we have that check in the TopicDeletionManager.start() and 
if the delete topic enable is set to false, DeleteTopicThread should not start.


was (Author: mgharat):
I am just thinking we have that check in the TopicDeletionManager.start() and 
if the delete topic enable is set to false, DeleteTopicThread should not start.

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-01-29 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124681#comment-15124681
 ] 

Mayuresh Gharat commented on KAFKA-3175:


I am just thinking we have that check in the TopicDeletionManager.start() and 
if the delete topic enable is set to false, DeleteTopicThread should not start.

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-01-29 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3175:
--

Assignee: Mayuresh Gharat

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-27 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15120014#comment-15120014
 ] 

Mayuresh Gharat commented on KAFKA-3147:


[~becket_qin] sure. I will take a look and upload a PR.

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>Assignee: Mayuresh Gharat
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-27 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3147:
--

Assignee: Mayuresh Gharat

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>Assignee: Mayuresh Gharat
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-26 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118482#comment-15118482
 ] 

Mayuresh Gharat edited comment on KAFKA-3147 at 1/27/16 2:45 AM:
-

[~junrao] I think abortIncompleteBatches was not a part of KIP-19 patch. I will 
surely like to look in to this though.  


was (Author: mgharat):
yep, will take a look at this.

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-26 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3147:
---
Comment: was deleted

(was: [~junrao] the abortExpiredBatches does get rid of the the RecordBatch 
from the queue :

   if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
})

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-26 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118496#comment-15118496
 ] 

Mayuresh Gharat commented on KAFKA-3147:


[~junrao] the abortExpiredBatches does get rid of the the RecordBatch from the 
queue :

   if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
}

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker

2016-01-26 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118482#comment-15118482
 ] 

Mayuresh Gharat commented on KAFKA-3147:


yep, will take a look at this.

> Memory records is not writable in MirrorMaker
> -
>
> Key: KAFKA-3147
> URL: https://issues.apache.org/jira/browse/KAFKA-3147
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Meghana Narasimhan
>
> Hi,
> We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few 
> mirror makers running. 
> When we do a rolling restart of the cluster, the mirror maker shuts down with 
> the following errors.
> [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id 
> 12491674 on topic-partition test-99, retrying (2147483646 attempts left). 
> Error: NOT_LEADER_FOR_PARTITION 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> java.lang.IllegalStateException: Memory records is not writable
> at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
> at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
> at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398)
> [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-75, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-93, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id 
> 12491679 on topic-partition test-24, retrying (2147483646 attempts left). 
> Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread 
> exited abnormally, stopping the whole mirror maker. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted 
> at in the thread , 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E
>
> And I think the mirror maker shuts down because of the 
> "abort.on.send.failure" which is set to true in our case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-25 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116436#comment-15116436
 ] 

Mayuresh Gharat commented on KAFKA-3126:


It does look like a problem with consistency. It might not be fatal though. I 
came across this behavior while debugging some other issue.

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-25 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3126:
---
Assignee: (was: Mayuresh Gharat)

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1796) Sanity check partition command line tools

2016-01-22 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat resolved KAFKA-1796.

Resolution: Not A Problem

> Sanity check partition command line tools
> -
>
> Key: KAFKA-1796
> URL: https://issues.apache.org/jira/browse/KAFKA-1796
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
>
> We need to sanity check the input json has the valid values before triggering 
> the admin process. For example, we have seen a scenario where the json input 
> for partition reassignment tools have partition replica info as {broker-1, 
> broker-1, broker-2} and it is still accepted in ZK and eventually lead to 
> under replicated count, etc. This is partially because we use a Map rather 
> than a Set reading the json input for this case; but in general we need to 
> make sure the input parameters like Json  needs to be valid before writing it 
> to ZK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2016-01-22 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15113128#comment-15113128
 ] 

Mayuresh Gharat commented on KAFKA-1860:


[~guozhang] can you take another look at the PR? I have included most of the 
comments on the PR. 

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-22 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15112795#comment-15112795
 ] 

Mayuresh Gharat commented on KAFKA-3126:


My confusion as described in this jira was :
(2) and (3) follow the same code path and similarly (5) and (6). So ideally (5) 
and (6) should say the same thing that "Cannot remove replica A from ISR of 
partition.."

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-21 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111611#comment-15111611
 ] 

Mayuresh Gharat commented on KAFKA-3126:


Can you explain the steps in further detail :

2. Controller B update ISR of partition p from [A, C] to [C] -> Update in 
Zookeeper you mean ?
3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker 
C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C 
expands ISR in zookeeper ?
4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR 
{A,C} propagated to B ?
5. When Broker A actually shuts down, Controller B will see A in the ISR. 
---> If {A,C} is propagated to B, why would it only see {A}

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-21 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111611#comment-15111611
 ] 

Mayuresh Gharat edited comment on KAFKA-3126 at 1/21/16 11:36 PM:
--

I had some questions :

2. Controller B update ISR of partition p from [A, C] to [C] -> Update in 
Zookeeper you mean ?
3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker 
C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C 
expands ISR in zookeeper ?
4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR 
{A,C} propagated to B ?
5. When Broker A actually shuts down, Controller B will see A in the ISR. 
---> If {A,C} is propagated to B, why would it only see {A}


was (Author: mgharat):
Can you explain the steps in further detail :

2. Controller B update ISR of partition p from [A, C] to [C] -> Update in 
Zookeeper you mean ?
3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker 
C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C 
expands ISR in zookeeper ?
4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR 
{A,C} propagated to B ?
5. When Broker A actually shuts down, Controller B will see A in the ISR. 
---> If {A,C} is propagated to B, why would it only see {A}

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-21 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111353#comment-15111353
 ] 

Mayuresh Gharat commented on KAFKA-3126:


I don't see why Controller B will see A in the ISR in the final step.

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-21 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3126:
--

Assignee: Mayuresh Gharat

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-20 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3126:
---
Description: 
Consider Broker B is controller, broker A is undergoing shutdown. 

2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A

2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic1,Partition=1,Replica=A] ---> (1)

2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> 
(2)

2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic2,Partition=1,Replica=A] ---> (3)

2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> 
(4)

2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
callback for A
2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
Controller B]: Invoking state change to OfflinePartition for partitions 

2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic2,Partition=0,Replica=A],
[Topic=__consumer_offsets,Partition=5,Replica=A],
[Topic=testTopic1,Partition=2,Replica=A],
[Topic=__consumer_offsets,Partition=96,Replica=A],
[Topic=testTopic2,Partition=1,Replica=A],
[Topic=__consumer_offsets,Partition=36,Replica=A],
[Topic=testTopic1,Partition=4,Replica=A],
[Topic=__consumer_offsets,Partition=85,Replica=A],
[Topic=testTopic1,Partition=6,Replica=A],
[Topic=testTopic1,Partition=1,Replica=A]

2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
--> (5)

2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove replica 
A from ISR of partition [testTopic1,1] since it is not in the ISR. Leader = D ; 
ISR = List(D) --> (6)

If after (1) and (2) controller gets rid of the replica A from the ISR in 
zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
for [testTopic2-1] as per (5)






  was:
Consider Broker B is controller, broker A is undergoing shutdown. 

2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A

2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic1,Partition=1,Replica=A] ---> (1)

2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> 
(2)

2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic2,Partition=1,Replica=A] ---> (3)

2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> 
(4)

2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
callback for A
2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
Controller B]: Invoking state change to OfflinePartition for partitions 

2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=__consumer_offsets,Partition=59,Replica=A],[Topic=__consumer_offsets,Partition=81,Replica=A],[Topic=__consumer_offsets,Partition=61,Replica=A],[Topic=testTopic2,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=5,Replica=A],[Topic=__consumer_offsets,Partition=89,Replica=A],[Topic=__consumer_offsets,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=22,Replica=A],[Topic=__consumer_offsets,Partition=60,Replica=A],[Topic=testTopic1,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=96,Replica=A],[Topic=testTopic2,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=36,Replica=A],[Topic=__consumer_offsets,Partition=10,Replica=A],[Topic=__consumer_offsets,Partition=69,Replica=A],[Topic=__consumer_offsets,Partition=43,Replica=A],[Topic=__consumer_offsets,Partition=57,Replica=A],[Topic=__consumer_offsets,Partition=21,Replica=A],[Topic=__consumer_offsets,Partition=46,Replica=A],[Topic=__consumer_offsets,Partition=83,Replica=A],[Topic=__consumer_offsets,Partition=17,Replica=A],[Topic=__consumer_offsets,Partition=73,Replica=A],[Topic=__consumer_offsets,Partition=58,Replica=A],[Topic=__consumer_offsets,Partition=26,Replica=A],[Topic=__consumer_offsets,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=70,Repli

[jira] [Created] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-01-20 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3126:
--

 Summary: Weird behavior in kafkaController on Controlled 
shutdowns. The leaderAndIsr in zookeeper is not updated during controlled 
shutdown.
 Key: KAFKA-3126
 URL: https://issues.apache.org/jira/browse/KAFKA-3126
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Mayuresh Gharat


Consider Broker B is controller, broker A is undergoing shutdown. 

2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A

2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic1,Partition=1,Replica=A] ---> (1)

2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> 
(2)

2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=testTopic2,Partition=1,Replica=A] ---> (3)

2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> 
(4)

2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
callback for A
2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
Controller B]: Invoking state change to OfflinePartition for partitions 

2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
controller B]: Invoking state change to OfflineReplica for replicas 
[Topic=__consumer_offsets,Partition=59,Replica=A],[Topic=__consumer_offsets,Partition=81,Replica=A],[Topic=__consumer_offsets,Partition=61,Replica=A],[Topic=testTopic2,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=5,Replica=A],[Topic=__consumer_offsets,Partition=89,Replica=A],[Topic=__consumer_offsets,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=22,Replica=A],[Topic=__consumer_offsets,Partition=60,Replica=A],[Topic=testTopic1,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=96,Replica=A],[Topic=testTopic2,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=36,Replica=A],[Topic=__consumer_offsets,Partition=10,Replica=A],[Topic=__consumer_offsets,Partition=69,Replica=A],[Topic=__consumer_offsets,Partition=43,Replica=A],[Topic=__consumer_offsets,Partition=57,Replica=A],[Topic=__consumer_offsets,Partition=21,Replica=A],[Topic=__consumer_offsets,Partition=46,Replica=A],[Topic=__consumer_offsets,Partition=83,Replica=A],[Topic=__consumer_offsets,Partition=17,Replica=A],[Topic=__consumer_offsets,Partition=73,Replica=A],[Topic=__consumer_offsets,Partition=58,Replica=A],[Topic=__consumer_offsets,Partition=26,Replica=A],[Topic=__consumer_offsets,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=70,Replica=A],[Topic=__consumer_offsets,Partition=13,Replica=A],[Topic=__consumer_offsets,Partition=62,Replica=A],[Topic=__consumer_offsets,Partition=25,Replica=A],[Topic=__consumer_offsets,Partition=9,Replica=A],[Topic=__consumer_offsets,Partition=29,Replica=A],[Topic=__consumer_offsets,Partition=97,Replica=A],[Topic=__consumer_offsets,Partition=53,Replica=A],[Topic=__consumer_offsets,Partition=77,Replica=A],[Topic=__consumer_offsets,Partition=40,Replica=A],[Topic=__consumer_offsets,Partition=52,Replica=A],[Topic=__consumer_offsets,Partition=50,Replica=A],[Topic=__consumer_offsets,Partition=64,Replica=A],[Topic=__consumer_offsets,Partition=23,Replica=A],[Topic=__consumer_offsets,Partition=55,Replica=A],[Topic=__consumer_offsets,Partition=93,Replica=A],[Topic=__consumer_offsets,Partition=12,Replica=A],[Topic=__consumer_offsets,Partition=16,Replica=A],[Topic=testTopic1,Partition=4,Replica=A],[Topic=__consumer_offsets,Partition=38,Replica=A],[Topic=__consumer_offsets,Partition=65,Replica=A],[Topic=__consumer_offsets,Partition=95,Replica=A],[Topic=__consumer_offsets,Partition=31,Replica=A],[Topic=__consumer_offsets,Partition=88,Replica=A],[Topic=__consumer_offsets,Partition=19,Replica=A],[Topic=__consumer_offsets,Partition=98,Replica=A],[Topic=__consumer_offsets,Partition=45,Replica=A],[Topic=__consumer_offsets,Partition=79,Replica=A],[Topic=__consumer_offsets,Partition=94,Replica=A],[Topic=__consumer_offsets,Partition=4,Replica=A],[Topic=__consumer_offsets,Partition=91,Replica=A],[Topic=__consumer_offsets,Partition=86,Replica=A],[Topic=__consumer_offsets,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=85,Replica=A],[Topic=testTopic1,Partition=6,Replica=A],[Topic=__consumer_offsets,Partition=24,Replica=A],[Topic=__consumer_offsets,Partition=72,Replica=A],[Topic=__consumer_offsets,Partition=37,Replica=A],[Topic=__consumer_offsets,Partition=82,Replica=A],[Topic=__consumer_offsets,Partition=76,Replica=A],[Topic=__consumer_offsets,Partition=7,Replica=A],[Topic=__consumer_offsets,Partition=74,R

[jira] [Commented] (KAFKA-1397) delete topic is not working

2016-01-20 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15109040#comment-15109040
 ] 

Mayuresh Gharat commented on KAFKA-1397:


Can you paste the server side logs?

> delete topic is not working 
> 
>
> Key: KAFKA-1397
> URL: https://issues.apache.org/jira/browse/KAFKA-1397
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Jun Rao
>Assignee: Timothy Chen
> Fix For: 0.8.2.0
>
> Attachments: KAFKA-1397.patch, KAFKA-1397_2014-04-28_14:48:32.patch, 
> KAFKA-1397_2014-04-28_17:08:49.patch, KAFKA-1397_2014-04-30_14:55:28.patch, 
> KAFKA-1397_2014-05-01_15:53:57.patch, KAFKA-1397_2014-05-01_18:12:24.patch, 
> KAFKA-1397_2014-05-02_13:38:02.patch, KAFKA-1397_2014-05-05_11:17:59.patch, 
> KAFKA-1397_2014-05-05_14:00:29.patch
>
>
> All unit tests are disabled since they hang transiently (see details in 
> KAFKA-1391).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-19 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15107770#comment-15107770
 ] 

Mayuresh Gharat commented on KAFKA-3083:


 However, the replica would be in the cache of the controller B, so would it be 
elected in this case? Would it be an actual problem if the B is demoted and 
another controller comes up? 

---> That seems right, it should be in the cache of controller B and should be 
elected as leader. If however B goes down before this and a new controller is 
elected, it will read the data from zookeeper and might not be able to elect a 
new leader if uncleanLeaderElection is turned OFF.

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-19 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15107417#comment-15107417
 ] 

Mayuresh Gharat commented on KAFKA-3083:


Hi [~fpj],

Correct me if I am wrong :
1) We need to use a multi-op that combines the update to the ISR and a znode 
check. The znode check verifies that the version of the controller leadership 
znode is still the same and if it passes, then the ISR data is updated. 
2) The race condition that [~junrao] mentioned still exist above in 1).
3) To overcome this we somehow need to detect that the broker A who was the 
controller got a session expiration and should drop all the zk work its doing 
immediately. 
4) To do step 3), as [~junrao] suggested we have to detect the connection loss 
event. Now 2 things might happen :
 i) Broker A has connection loss and connects immediately in which case it 
gets a SyncConnected event. Now the session MIGHT NOT have expired since the 
connection happened immediately. Broker A is expected to continue since it is 
still the controller and the session has not expired.
ii) Broker A has connection loss and connects back in which case it gets a 
SyncConnected event. Now the session MIGHT have expired. Broker A is expected 
to stop all the zk operations.
The only difference between i) and ii) is SessionExpiration check.















> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102912#comment-15102912
 ] 

Mayuresh Gharat commented on KAFKA-3083:


Hi Jun,

I was talking about the SessionExpirationListener that implements the 
IZkStateListener in KafkaController that has the handleStateChanged() api. I 
was thinking if we can handle the statechange to Disconnected in that callback, 
to do the cleanup.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102887#comment-15102887
 ] 

Mayuresh Gharat commented on KAFKA-3083:


That's right. 
I am thinking if there is a way to check the ConnectionLoss using 
handleStateChanged() api on the SessionExpirationListener which can be use to 
drop all the current zk task that controller A was about to do.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102798#comment-15102798
 ] 

Mayuresh Gharat commented on KAFKA-3083:


on a side note, I noticed a new issue in controlled shutdown that is kind of 
similar. I will open a new Jira for that. 

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102791#comment-15102791
 ] 

Mayuresh Gharat commented on KAFKA-3083:


[~junrao], [~fpj] that makes it more clear. I was just thinking if we can 
modify the controller code to always check if it is the controller before it 
makes such changes to zookeeper. 
Again there is a race condition, wherein Broker A's session timesOut at time T. 
Broker B becomes the controller at T+2. Broker A can still proceed with the 
changes to ZK between T and T+2.

I had some questions on the zookeeper session expiry:

1) If suppose a broker establishes a connection with zookeeper and  has a 
ZookeeperSessionTimeout set to 10 min.The broker goes down/or is stuck and 
comes back up and connects to zookeeper after 5 min, will it connect on the 
same session?

2) The session Expiry is only invoked on SessionTimeout and nothing else. Am I 
right?



> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102259#comment-15102259
 ] 

Mayuresh Gharat commented on KAFKA-3083:


For your question regarding  :
If broker A shrinks the partition ISR in ZK before step 2 but notifies C after 
step 2 as in the description, does C eventually learn the ISR stored in ZK?

C will not know about ISR stored in ZK until next LeaderAndISR from the new 
controller.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-14 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098607#comment-15098607
 ] 

Mayuresh Gharat edited comment on KAFKA-3083 at 1/14/16 6:34 PM:
-

That's a very good point, I will verify if this can happen. 
Moreover, I think the behavior should be :
1) Broker A was the controller.
2) Broker A faces a session expiration, invokes the controllerResignation and 
clears all its caches and also stops all the ongoing controller work.
3) Broker B becomes the controller and proceeds. 

what do you think?


was (Author: mgharat):
That's a very good point, I will verify if this can happen. 
Moreover, I think the behavior should be :
1) Broker A was the controller.
2) Broker A faces a session expiration, invokes the controllerResignation and 
clears all its caches and also stops all the ongoing controller work.
3) Broker B becomes the controller and proceeds. 

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-14 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098607#comment-15098607
 ] 

Mayuresh Gharat commented on KAFKA-3083:


That's a very good point, I will verify if this can happen. 
Moreover, I think the behavior should be :
1) Broker A was the controller.
2) Broker A faces a session expiration, invokes the controllerResignation and 
clears all its caches and also stops all the ongoing controller work.
3) Broker B becomes the controller and proceeds. 

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-12 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-3083:
--

Assignee: Mayuresh Gharat

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side

2016-01-08 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089686#comment-15089686
 ] 

Mayuresh Gharat edited comment on KAFKA-2410 at 1/8/16 6:48 PM:


I would like to bump up discussion on this. 
The way I understand this is that we have 2 points :

1) Have a config on the producer and consumer.
   -> TMR is disabled on the broker side.
   -> Producer and consumer have this config set to TRUE.
  -> The producer does the send(), it issues a TMR and finds that the topic 
does not exist. It will check this config and issue a createTopicRequest. When 
the createTopicRequest returns  success, it will start producing the actual 
data.
  -> Same goes for Consumer, if the topic does not exist, consumer will create 
it if the config is enabled. This is not a hard requirement on the consumer 
side.

2) No config and the topic creation is left to user explicitly.

The problem with 2) is that, if we have mirror makers running, we have to do 
the topic creation in producer, in some sort of callback may be, but it would 
be better to go with 1) in this scenario.

+ Joel [~jjkoshy] 




was (Author: mgharat):
I would like to bump up discussion on this. 
The way I understand this is that we have 2 points :

1) Have a config on the producer and consumer.
   a) TMR is disabled on the broker side.
   b) Producer and consumer have this config set to TRUE.
  c) The producer does the send(), it issues a TMR and finds that the topic 
does not exist. It will check this config and issue a createTopicRequest. When 
the createTopicRequest returns  success, it will start producing the actual 
data.
  d) Same goes for Consumer, if the topic does not exist, consumer will create 
it if the config is enabled. This is not a hard requirement on the consumer 
side.

2) No config and the topic creation is left to user explicitly.

The problem with 2) is that, if we have mirror makers running, we have to do 
the topic creation in producer, in some sort of callback may be, but it would 
be better to go with 1) in this scenario.

+ Joel [~jjkoshy] 



> Implement "Auto Topic Creation" client side and remove support from Broker 
> side
> ---
>
> Key: KAFKA-2410
> URL: https://issues.apache.org/jira/browse/KAFKA-2410
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.8.2.1
>Reporter: Grant Henke
>Assignee: Grant Henke
>
> Auto topic creation on the broker has caused pain in the past; And today it 
> still causes unusual error handling requirements on the client side, added 
> complexity in the broker, mixed responsibility of the TopicMetadataRequest, 
> and limits configuration of the option to be cluster wide. In the future 
> having it broker side will also make features such as authorization very 
> difficult. 
> There have been discussions in the past of implementing this feature client 
> side. 
> [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746]
> This Jira is to track that discussion and implementation once the necessary 
> protocol support exists: KAFKA-2229



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side

2016-01-08 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089686#comment-15089686
 ] 

Mayuresh Gharat commented on KAFKA-2410:


I would like to bump up discussion on this. 
The way I understand this is that we have 2 points :

1) Have a config on the producer and consumer.
   a) TMR is disabled on the broker side.
   b) Producer and consumer have this config set to TRUE.
  c) The producer does the send(), it issues a TMR and finds that the topic 
does not exist. It will check this config and issue a createTopicRequest. When 
the createTopicRequest returns  success, it will start producing the actual 
data.
  d) Same goes for Consumer, if the topic does not exist, consumer will create 
it if the config is enabled. This is not a hard requirement on the consumer 
side.

2) No config and the topic creation is left to user explicitly.

The problem with 2) is that, if we have mirror makers running, we have to do 
the topic creation in producer, in some sort of callback may be, but it would 
be better to go with 1) in this scenario.

+ Joel [~jjkoshy] 



> Implement "Auto Topic Creation" client side and remove support from Broker 
> side
> ---
>
> Key: KAFKA-2410
> URL: https://issues.apache.org/jira/browse/KAFKA-2410
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.8.2.1
>Reporter: Grant Henke
>Assignee: Grant Henke
>
> Auto topic creation on the broker has caused pain in the past; And today it 
> still causes unusual error handling requirements on the client side, added 
> complexity in the broker, mixed responsibility of the TopicMetadataRequest, 
> and limits configuration of the option to be cluster wide. In the future 
> having it broker side will also make features such as authorization very 
> difficult. 
> There have been discussions in the past of implementing this feature client 
> side. 
> [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746]
> This Jira is to track that discussion and implementation once the necessary 
> protocol support exists: KAFKA-2229



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-08 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089586#comment-15089586
 ] 

Mayuresh Gharat commented on KAFKA-3083:


[~fpj] can you shed some light on what you meant by misuse?
[~junrao] if this is actually an issue, I would like to give a shot at it.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2887) TopicMetadataRequest creates topic if it does not exist

2016-01-06 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15086470#comment-15086470
 ] 

Mayuresh Gharat commented on KAFKA-2887:


[~hachikuji] just wanted to know if we have ticket for which the above patch is 
available. 

> TopicMetadataRequest creates topic if it does not exist
> ---
>
> Key: KAFKA-2887
> URL: https://issues.apache.org/jira/browse/KAFKA-2887
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
> Environment: Centos6, Java 1.7.0_75
>Reporter: Andrew Winterman
>Priority: Minor
>
> We wired up a probe http endpoint to make TopicMetadataRequests with a 
> possible topic name. If no topic was found, we expected an empty response. 
> However if we asked for the same topic twice, it would exist the second time!
> I think this is a bug because the purpose of the TopicMetadaRequest is to 
> provide  information about the cluster, not mutate it. I can provide example 
> code if needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1911) Log deletion on stopping replicas should be async

2016-01-05 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-1911:
---
Status: Patch Available  (was: In Progress)

> Log deletion on stopping replicas should be async
> -
>
> Key: KAFKA-1911
> URL: https://issues.apache.org/jira/browse/KAFKA-1911
> Project: Kafka
>  Issue Type: Bug
>  Components: log, replication
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>  Labels: newbie++, newbiee
>
> If a StopReplicaRequest sets delete=true then we do a file.delete on the file 
> message sets. I was under the impression that this is fast but it does not 
> seem to be the case.
> On a partition reassignment in our cluster the local time for stop replica 
> took nearly 30 seconds.
> {noformat}
> Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; 
> ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: 
> 53 from 
> client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0
> {noformat}
> This ties up one API thread for the duration of the request.
> Specifically in our case, the queue times for other requests also went up and 
> producers to the partition that was just deleted on the old leader took a 
> while to refresh their metadata (see KAFKA-1303) and eventually ran out of 
> retries on some messages leading to data loss.
> I think the log deletion in this case should be fully asynchronous although 
> we need to handle the case when a broker may respond immediately to the 
> stop-replica-request but then go down after deleting only some of the log 
> segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-1911) Log deletion on stopping replicas should be async

2016-01-05 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-1911 started by Mayuresh Gharat.
--
> Log deletion on stopping replicas should be async
> -
>
> Key: KAFKA-1911
> URL: https://issues.apache.org/jira/browse/KAFKA-1911
> Project: Kafka
>  Issue Type: Bug
>  Components: log, replication
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>  Labels: newbie++, newbiee
>
> If a StopReplicaRequest sets delete=true then we do a file.delete on the file 
> message sets. I was under the impression that this is fast but it does not 
> seem to be the case.
> On a partition reassignment in our cluster the local time for stop replica 
> took nearly 30 seconds.
> {noformat}
> Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; 
> ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: 
> 53 from 
> client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0
> {noformat}
> This ties up one API thread for the duration of the request.
> Specifically in our case, the queue times for other requests also went up and 
> producers to the partition that was just deleted on the old leader took a 
> while to refresh their metadata (see KAFKA-1303) and eventually ran out of 
> retries on some messages leading to data loss.
> I think the log deletion in this case should be fully asynchronous although 
> we need to handle the case when a broker may respond immediately to the 
> stop-replica-request but then go down after deleting only some of the log 
> segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator

2016-01-05 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-3013:
---
Status: Patch Available  (was: In Progress)

> Display the topic-partition in the exception message for expired batches in 
> recordAccumulator 
> --
>
> Key: KAFKA-3013
> URL: https://issues.apache.org/jira/browse/KAFKA-3013
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator

2016-01-05 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3013 started by Mayuresh Gharat.
--
> Display the topic-partition in the exception message for expired batches in 
> recordAccumulator 
> --
>
> Key: KAFKA-3013
> URL: https://issues.apache.org/jira/browse/KAFKA-3013
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3062) Read from kafka replication to get data likely Version based

2016-01-04 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081987#comment-15081987
 ] 

Mayuresh Gharat commented on KAFKA-3062:


Hi [~itismewxg], can you explain the usecase here with an example?

Thanks,

Mayuresh

> Read from kafka replication to get data likely Version based
> 
>
> Key: KAFKA-3062
> URL: https://issues.apache.org/jira/browse/KAFKA-3062
> Project: Kafka
>  Issue Type: Improvement
>Reporter: xingang
>
> Since Kafka require all the reading happens in the leader for the consistency.
> If there could be possible for the reading can happens in replication, thus, 
> for data have a number of consumers, for the consumers Not latency-sensitive 
> But Data-Loss sensitive can fetch its data from replication, in this case, it 
> will pollute the Pagecache for other consumers which are latency-sensitive



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2016-01-04 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081910#comment-15081910
 ] 

Mayuresh Gharat commented on KAFKA-2937:


Yes, that might be possible but it should be rare. The 
AdminUtils.topicExists(topicName) call checks only if the the entry 
/brokers/topics/topicName exist in the zookeeper. The createTopic() should 
write to the path : /brokers/topic/topicName and also update the 
replicaAssignment for the topic in Zookeeper.

If somehow the second part got messed up, the above error can occur. 

> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {quote}  
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3040) Broker didn't report new data after change in leader

2016-01-04 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081519#comment-15081519
 ] 

Mayuresh Gharat commented on KAFKA-3040:


At Linkedin, we do have a separate controller log file on the broker that is 
the controller for the cluster. Can you see something like this "Broker 
HOST-NAME starting become controller state transition" on the broker that is 
the controller for the cluster?

> Broker didn't report new data after change in leader
> 
>
> Key: KAFKA-3040
> URL: https://issues.apache.org/jira/browse/KAFKA-3040
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>
> Recently we had an event that causes large Kafka backlogs to develop 
> suddenty. This happened across multiple partitions. We noticed that after a 
> brief connection loss to Zookeeper, Kafka brokers were not reporting no new 
> data to our (SimpleConsumer) consumer although the producers were enqueueing 
> fine. This went on until another zk blip led to a reconfiguration which 
> suddenly caused the consumers to "see" the data. Our consumers and our 
> monitoring tools did not see the offsets move during the outage window. Here 
> is the sequence of events for a single partition (with logs attached below). 
> The brokers are running 0.9, the producer is using library version 
> kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
> programs). Our monitoring tool uses kafka-python-9.0
> Can you tell us if this could be due to a consumer bug (the libraries being 
> too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
> issue? Please note that we recently upgraded the brokers to 0.9 and hadn't 
> seen a similar issue prior to that.
> - after a brief connection loss to zookeeper, the partition leader (broker 9 
> for partition 29 in logs below) came back and shrunk the ISR to itself. 
> - producers kept on successfully sending data to Kafka and the remaining 
> replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
> Broker 9 did NOT replicate this data. It did repeatedly print the ISR 
> shrinking message over and over again.
> - consumer on the other hand reported no new data presumably because it was 
> talking to 9 and that broker was doing nothing.
> - 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
> now consumers started seeing new data. 
> Broker 9:
> [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
> ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
> [2015-12-18 00:59:25,511] INFO New leader is 9 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
> ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
> [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
> zkVersion [472] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
> 14169556269. (kafka.log.Log)
> [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
> fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
> BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
> initOffset 14156091271 to broker 
> BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
> initOffset 14135826155 to broker 
> BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
> initOffset 14157926400 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
> initOffset 14169556269 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
> initOffset 14175218230 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> Broker 3:
> [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
> ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
> [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> Broker 4:
> [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
> fetcher

[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2016-01-04 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081352#comment-15081352
 ] 

Mayuresh Gharat commented on KAFKA-2937:


[~djh] does your integration test check if the topic is created successfully 
before attempting a delete?

> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {quote}  
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3040) Broker didn't report new data after change in leader

2015-12-24 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071192#comment-15071192
 ] 

Mayuresh Gharat edited comment on KAFKA-3040 at 12/24/15 7:09 PM:
--

Do you have the controller logs for the time period?
Also you might want to check :
https://issues.apache.org/jira/browse/KAFKA-3042


was (Author: mgharat):
Do you have the controller logs for the time period?

> Broker didn't report new data after change in leader
> 
>
> Key: KAFKA-3040
> URL: https://issues.apache.org/jira/browse/KAFKA-3040
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>
> Recently we had an event that causes large Kafka backlogs to develop 
> suddenty. This happened across multiple partitions. We noticed that after a 
> brief connection loss to Zookeeper, Kafka brokers were not reporting no new 
> data to our (SimpleConsumer) consumer although the producers were enqueueing 
> fine. This went on until another zk blip led to a reconfiguration which 
> suddenly caused the consumers to "see" the data. Our consumers and our 
> monitoring tools did not see the offsets move during the outage window. Here 
> is the sequence of events for a single partition (with logs attached below). 
> The brokers are running 0.9, the producer is using library version 
> kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
> programs). Our monitoring tool uses kafka-python-9.0
> Can you tell us if this could be due to a consumer bug (the libraries being 
> too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
> issue? Please note that we recently upgraded the brokers to 0.9 and hadn't 
> seen a similar issue prior to that.
> - after a brief connection loss to zookeeper, the partition leader (broker 9 
> for partition 29 in logs below) came back and shrunk the ISR to itself. 
> - producers kept on successfully sending data to Kafka and the remaining 
> replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
> Broker 9 did NOT replicate this data. It did repeatedly print the ISR 
> shrinking message over and over again.
> - consumer on the other hand reported no new data presumably because it was 
> talking to 9 and that broker was doing nothing.
> - 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
> now consumers started seeing new data. 
> Broker 9:
> [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
> ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
> [2015-12-18 00:59:25,511] INFO New leader is 9 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
> ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
> [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
> zkVersion [472] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
> 14169556269. (kafka.log.Log)
> [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
> fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
> BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
> initOffset 14156091271 to broker 
> BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
> initOffset 14135826155 to broker 
> BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
> initOffset 14157926400 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
> initOffset 14169556269 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
> initOffset 14175218230 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> Broker 3:
> [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
> ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
> [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> Broker 4:
> [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
> fetche

[jira] [Commented] (KAFKA-3040) Broker didn't report new data after change in leader

2015-12-24 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071192#comment-15071192
 ] 

Mayuresh Gharat commented on KAFKA-3040:


Do you have the controller logs for the time period?

> Broker didn't report new data after change in leader
> 
>
> Key: KAFKA-3040
> URL: https://issues.apache.org/jira/browse/KAFKA-3040
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>
> Recently we had an event that causes large Kafka backlogs to develop 
> suddenty. This happened across multiple partitions. We noticed that after a 
> brief connection loss to Zookeeper, Kafka brokers were not reporting no new 
> data to our (SimpleConsumer) consumer although the producers were enqueueing 
> fine. This went on until another zk blip led to a reconfiguration which 
> suddenly caused the consumers to "see" the data. Our consumers and our 
> monitoring tools did not see the offsets move during the outage window. Here 
> is the sequence of events for a single partition (with logs attached below). 
> The brokers are running 0.9, the producer is using library version 
> kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
> programs). Our monitoring tool uses kafka-python-9.0
> Can you tell us if this could be due to a consumer bug (the libraries being 
> too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
> issue? Please note that we recently upgraded the brokers to 0.9 and hadn't 
> seen a similar issue prior to that.
> - after a brief connection loss to zookeeper, the partition leader (broker 9 
> for partition 29 in logs below) came back and shrunk the ISR to itself. 
> - producers kept on successfully sending data to Kafka and the remaining 
> replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
> Broker 9 did NOT replicate this data. It did repeatedly print the ISR 
> shrinking message over and over again.
> - consumer on the other hand reported no new data presumably because it was 
> talking to 9 and that broker was doing nothing.
> - 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
> now consumers started seeing new data. 
> Broker 9:
> [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
> ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
> [2015-12-18 00:59:25,511] INFO New leader is 9 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
> ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
> [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
> zkVersion [472] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
> 14169556269. (kafka.log.Log)
> [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
> fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
> BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
> initOffset 14156091271 to broker 
> BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
> initOffset 14135826155 to broker 
> BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
> initOffset 14157926400 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
> initOffset 14169556269 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
> initOffset 14175218230 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> Broker 3:
> [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
> ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
> [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> Broker 4:
> [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
> fetcher for partitions List([[messages,29], initOffset 14169556262 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:50,191] ERROR [Repli

[jira] [Comment Edited] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2015-12-23 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15070294#comment-15070294
 ] 

Mayuresh Gharat edited comment on KAFKA-2937 at 12/23/15 11:21 PM:
---

[~rsivaram] This can occur if during delete topic, the controller writes the 
updated LeaderISR to zookeeper and dies and a new controller is elected and 
since the  /admin/delete_topics/ path contains the topic, the deleteTopic is 
retried and when the controller tries to send the stopReplicaRequest with 
deletePartition = false when it tries to remove ReplicafromISR and since there 
is no LeaderISR, the above exception is thrown. 

I am thinking if while deleting a topic, is it necessary for the check :
if (leaderAndIsrIsEmpty)
throw new StateChangeFailedException(
  "Failed to change state of replica %d for partition %s since the 
leader and isr path in zookeeper is empty"
  .format(replicaId, topicAndPartition))

I am planning to check if the topic is being deleted and if YES, we do not 
throw the exception if the LeaderISR info in zookeeper is empty.

On a side note did you see a log line "Retrying delete topic for topic 
since replicaswere not successfully deleted".




was (Author: mgharat):
[~rsivaram] This can occur if during delete topic, the controller writes the 
updated LeaderISR to zookeeper and dies and a new controller is elected and 
since the  /admin/delete_topics/ path contains the topic, the deleteTopic is 
retried and when the controller tries to send the stopReplicaRequest with 
deletePartition = false when it tries to remove ReplicafromISR and since there 
is no LeaderISR, the above exception is thrown. 

I am thinking if while deleting a topic, is it necessary for the check :
if (leaderAndIsrIsEmpty)
throw new StateChangeFailedException(
  "Failed to change state of replica %d for partition %s since the 
leader and isr path in zookeeper is empty"
  .format(replicaId, topicAndPartition))


On a side note did you see a log line "Retrying delete topic for topic 
since replicaswere not successfully deleted".



> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicD

[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2015-12-23 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15070294#comment-15070294
 ] 

Mayuresh Gharat commented on KAFKA-2937:


[~rsivaram] This can occur if during delete topic, the controller writes the 
updated LeaderISR to zookeeper and dies and a new controller is elected and 
since the  /admin/delete_topics/ path contains the topic, the deleteTopic is 
retried and when the controller tries to send the stopReplicaRequest with 
deletePartition = false when it tries to remove ReplicafromISR and since there 
is no LeaderISR, the above exception is thrown. 

I am thinking if while deleting a topic, is it necessary for the check :
if (leaderAndIsrIsEmpty)
throw new StateChangeFailedException(
  "Failed to change state of replica %d for partition %s since the 
leader and isr path in zookeeper is empty"
  .format(replicaId, topicAndPartition))


On a side note did you see a log line "Retrying delete topic for topic 
since replicaswere not successfully deleted".



> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.sc

[jira] [Created] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator

2015-12-18 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-3013:
--

 Summary: Display the topic-partition in the exception message for 
expired batches in recordAccumulator 
 Key: KAFKA-3013
 URL: https://issues.apache.org/jira/browse/KAFKA-3013
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2015-12-18 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064365#comment-15064365
 ] 

Mayuresh Gharat edited comment on KAFKA-2937 at 12/18/15 6:21 PM:
--

I ran a test with few commits behind trunk HEAD : 

1) Started the kafka cluster.
2) Produced data to topic T for quite some time so that enough data is 
accumulated on kafka brokers.
3) Delete a topic and immediately shutdown the cluster.
4) I could see the topic T under /admin/delete_topics.
5) After I restarted the cluster, the topic T was deleted.


There might be a race condition here, were controller might have gone down 
after it wrote to zookeeper but under removed the topic under 
/admin/deleted_topics. I will have to investigate more on this.
[~rsivaram]What is the version of kafka that you are running?


was (Author: mgharat):
I ran a test with few commits behind trunk HEAD : 

1) Started the kafka cluster.
2) Produced data to topic T for quite some time so that enough data is 
accumulated on kafka brokers.
3) Delete a topic and immediately shutdown the cluster.
4) I could see the topic T under /admin/delete_topics.
5) After I restarted the cluster, the topic T was deleted.

> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.con

[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable

2015-12-18 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064365#comment-15064365
 ] 

Mayuresh Gharat commented on KAFKA-2937:


I ran a test with few commits behind trunk HEAD : 

1) Started the kafka cluster.
2) Produced data to topic T for quite some time so that enough data is 
accumulated on kafka brokers.
3) Delete a topic and immediately shutdown the cluster.
4) I could see the topic T under /admin/delete_topics.
5) After I restarted the cluster, the topic T was deleted.

> Topics marked for delete in Zookeeper may become undeletable
> 
>
> Key: KAFKA-2937
> URL: https://issues.apache.org/jira/browse/KAFKA-2937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Mayuresh Gharat
>
> In our clusters, we occasionally see topics marked for delete, but never 
> actually deleted. It may be due to brokers being restarted while tests were 
> running, but further restarts of Kafka dont fix the problem. The topics 
> remain marked for delete in Zookeeper.
> Topic describe shows:
> {quote}
> Topic:testtopic   PartitionCount:1ReplicationFactor:3 Configs:
>   Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 
> Isr: 
> {quote}
> Kafka logs show:
> {quote}
> 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change 
> of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica 
> failed (state.change.logger)
> kafka.common.StateChangeFailedException: Failed to change state of replica 3 
> for partition [testtopic,0] since the leader and isr path in zookeeper is 
> empty
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:111)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {quote}  
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >