[jira] [Created] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.
Mayuresh Gharat created KAFKA-7548: -- Summary: KafkaConsumer should not throw away already fetched data for paused partitions. Key: KAFKA-7548 URL: https://issues.apache.org/jira/browse/KAFKA-7548 Project: Kafka Issue Type: Improvement Components: clients Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka brokers that is buffered in completedFetch queue. Now if we pause a few partitions, it seems that in next call to poll we remove the completedFetches for those paused partitions. Normally, if an application is calling pause on topicPartitions, it is likely to return to those topicPartitions in near future and when it does, with the current design we would have to re-fetch that data. At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data would improve the performance for stream applications like Samza. We ran a benchmark were we compared what is the throughput w.r.t to different values of maxPollRecords. We had a consumer subscribed to 10 partitions of a high volume topic and paused different number of partitions for every poll call. Here are the results : *Before fix (records consumed)* |maxPollRecords-> Number of Partitions Paused \| V|10|5|1| |0|8605320 (60.022276059 sec)|8337690 (60.026690095 sec)|6424753 (60.67003 sec)| |2|101910 (60.006989628 sec)|49350 (60.022598668 sec)|10495 (60.020077555 sec)| |4|48420 (60.022096537 sec)|24850 (60.007451162 sec)|5004 (60.009773507 sec) | |6|30420 (60.018380086 sec)|15385 (60.011912135 sec)|3152 (60.013573487 sec)| |8|23390 (60.043122495 sec)|11390 (60.013297496 sec)|2237 (60.038921333 sec)| |9|20230 (60.026183204 sec)|10355 (60.015584914 sec)|2087 (60.00319069 sec)| *After fix (records consumed)* |Number of Partitions Paused / maxPollRecords|10|5|1| |0|8662740 (60.011527576 sec)|8203445 (60.022204036 sec)|5846512 (60.0168916 sec)| |2|8257390 (60.011121061 sec)|7776150 (60.01620875 sec)|5269557 (60.022581248 sec)| |4|7938510 (60.011829002 sec)|7510140 (60.017571391 sec)|5213496 (60.000230139 sec)| |6|7100970 (60.007220465 sec)|6382845 (60.038580526 sec)|4519645 (60.48034 sec)| |8|6799956 (60.001850171 sec)|6482421 (60.001997219 sec)|4383300 (60.4836 sec)| |9|7045177 (60.035366096 sec)|6465839 (60.41961 sec)|4884693 (60.42054 sec)| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions
Mayuresh Gharat created KAFKA-7096: -- Summary: Consumer should drop the data for unassigned topic partitions Key: KAFKA-7096 URL: https://issues.apache.org/jira/browse/KAFKA-7096 Project: Kafka Issue Type: Improvement Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat currently if a client has assigned topics : T1, T2, T3 and calls poll(), the poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the client unassigns some topics (for example T3) and calls poll() we still hold the data (for T3) in the completedFetches queue until we actually reach the buffered data for the unassigned Topics (T3 in our example) on subsequent poll() calls, at which point we drop that data. This process of holding the data is unnecessary. When a client creates a topic, it takes time for the broker to fetch ACLs for the topic. But during this time, the client will issue fetchRequest for the topic, it will get response for the partitions of this topic. The response consist of TopicAuthorizationException for each of the partitions. This response for each partition is wrapped with a completedFetch and added to the completedFetches queue. Now when the client calls the next poll() it sees the TopicAuthorizationException from the first buffered CompletedFetch. At this point the client chooses to sleep for 1.5 min as a backoff (as per the design), hoping that the Broker fetches the ACL from ACL store in the meantime. Actually the Broker has already fetched the ACL by this time. When the client calls poll() after the sleep, it again sees the TopicAuthorizationException from the second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) seconds before the client can see any data. With this patch, the client when it sees the first TopicAuthorizationException, it can all assign(EmptySet), which will get rid of the buffered completedFetches (those with TopicAuthorizationException) and it can again call assign(TopicPartitions) before calling poll(). With this patch we found that client was able to get the records as soon as the Broker fetched the ACLs from ACL store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935327#comment-15935327 ] Mayuresh Gharat edited comment on KAFKA-4808 at 3/21/17 9:00 PM: - [~ijuma] Please find the KIP here : https://cwiki.apache.org/confluence/display/KAFKA/KIP-135+%3A+Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user. was (Author: mgharat): [~ijuma] Please find the KIP here : https://cwiki.apache.org/confluence/display/KAFKA/Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user. > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma >Assignee: Mayuresh Gharat > Fix For: 0.11.0.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935327#comment-15935327 ] Mayuresh Gharat commented on KAFKA-4808: [~ijuma] Please find the KIP here : https://cwiki.apache.org/confluence/display/KAFKA/Send+of+null+key+to+a+compacted+topic+should+throw+non-retriable+error+back+to+user. > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma >Assignee: Mayuresh Gharat > Fix For: 0.11.0.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886365#comment-15886365 ] Mayuresh Gharat commented on KAFKA-4808: [~ijuma] Sure we can provide a better error message if we have a separate Error_Code. It looks like a subclass of InvalidRequestException as its indeed an invalid produce request for that topic. I will work on the KIP. Thanks, Mayuresh > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma >Assignee: Mayuresh Gharat > Fix For: 0.10.3.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-4808: -- Assignee: Mayuresh Gharat > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma >Assignee: Mayuresh Gharat > Fix For: 0.10.3.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886288#comment-15886288 ] Mayuresh Gharat commented on KAFKA-4808: [~ijuma] I was thinking if throwing "INVALID_REQUEST" exception should work here, or do we need to add a new Exception type (which would require a KIP)? > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma > Fix For: 0.10.3.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] (KAFKA-1610) Local modifications to collections generated from mapValues will be lost
[ https://issues.apache.org/jira/browse/KAFKA-1610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847355#comment-15847355 ] Mayuresh Gharat commented on KAFKA-1610: [~jozi-k] sure. The patch has been available but we somehow missed this. > Local modifications to collections generated from mapValues will be lost > > > Key: KAFKA-1610 > URL: https://issues.apache.org/jira/browse/KAFKA-1610 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Mayuresh Gharat > Labels: newbie > Attachments: KAFKA-1610_2014-08-29_09:51:51.patch, > KAFKA-1610_2014-08-29_10:03:55.patch, KAFKA-1610_2014-09-03_11:27:50.patch, > KAFKA-1610_2014-09-16_13:08:17.patch, KAFKA-1610_2014-09-16_15:23:27.patch, > KAFKA-1610_2014-09-30_23:21:46.patch, KAFKA-1610_2014-10-02_12:07:01.patch, > KAFKA-1610_2014-10-02_12:09:46.patch, KAFKA-1610.patch > > > In our current Scala code base we have 40+ usages of mapValues, however it > has an important semantic difference with map, which is that "map" creates a > new map collection instance, while "mapValues" just create a map view of the > original map, and hence any further value changes to the view will be > effectively lost. > Example code: > {code} > scala> case class Test(i: Int, var j: Int) {} > defined class Test > scala> val a = collection.mutable.Map(1 -> 1) > a: scala.collection.mutable.Map[Int,Int] = Map(1 -> 1) > scala> val b = a.mapValues(v => Test(v, v)) > b: scala.collection.Map[Int,Test] = Map(1 -> Test(1,1)) > scala> val c = a.map(v => v._1 -> Test(v._2, v._2)) > c: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,1)) > scala> b.foreach(kv => kv._2.j = kv._2.j + 1) > scala> b > res1: scala.collection.Map[Int,Test] = Map(1 -> Test(1,1)) > scala> c.foreach(kv => kv._2.j = kv._2.j + 1) > scala> c > res3: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,2)) > scala> a.put(1,3) > res4: Option[Int] = Some(1) > scala> b > res5: scala.collection.Map[Int,Test] = Map(1 -> Test(3,3)) > scala> c > res6: scala.collection.mutable.Map[Int,Test] = Map(1 -> Test(1,2)) > {code} > We need to go through all these mapValue to see if they should be changed to > map -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730371#comment-15730371 ] Mayuresh Gharat commented on KAFKA-4454: [~junrao], thanks a lot for the review. The thought of adding an equality check for channelPrincipal(if it exist) did cross my mind, but I left it out purposely. The reason was, I thought that Kafka internally, mainly cares about the principal name and principal type and the principal name actually comes from the channelPrincipal. But now that I think more about it, the channelPrincipal might be different custom types like servicePrincipal or userPrincipal and so on, with same name and so it does make sense to add proper equality check there. I agree that there might be a better way of doing this. I will write up a KIP for this and submit for review soon. > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114 ] Mayuresh Gharat edited comment on KAFKA-4454 at 12/2/16 8:13 PM: - [~ijuma] To make progress on this let me upload a PR for this and then if we want to include SimplePrincipal, I will be happy to rebase the patch you mentioned above. Does that work ? :) was (Author: mgharat): [~ijuma] I might not be understanding "accentuates the differences between authentication and authorization." completely. Let me upload a PR for this and then if we want to include SimplePrincipal, I will be happy to rebase the patch you mentioned here. Does that work ? :) > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114 ] Mayuresh Gharat edited comment on KAFKA-4454 at 12/2/16 8:00 PM: - [~ijuma] I might not be understanding "accentuates the differences between authentication and authorization." completely. Let me upload a PR for this and then if we want to include SimplePrincipal, I will be happy to rebase the patch you mentioned here. Does that work ? :) was (Author: mgharat): [~ijuma] I might not be understanding "accentuates the differences between authentication and authorization." completely. Let me upload a PR for this and then if we want to include SimplePrincipal, I will be happy to rebase the patch you mentioned here. Does that work ? :) > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716114#comment-15716114 ] Mayuresh Gharat commented on KAFKA-4454: [~ijuma] I might not be understanding "accentuates the differences between authentication and authorization." completely. Let me upload a PR for this and then if we want to include SimplePrincipal, I will be happy to rebase the patch you mentioned here. Does that work ? :) > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713402#comment-15713402 ] Mayuresh Gharat commented on KAFKA-4454: [~ijuma] Thanks for pointing me to the patch. The idea of using SimplePrincipal looks goods OR we can add an another constructor that takes in an additional parameter "channelPrincipal" of type Java.Principal. The main change will be required in SocketServer line : val session = RequestChannel.Session(KafkaPrincipal.fromPrincipal(channel.principal), channel.socketAddress) and change it to : val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName, channel.principal()) where channel.principal() is going to return the Principal generated by the PrincipalBuilder. Regarding "Do you have some examples of fields that you would want your principal to pass?" ---> Our Authorizer implementation delegates to Linkedin's security infra team's library that creates a Java.Principal with some additional information form the provided client cert. This information is required by their ACL service to ALLOW or DENY operations. This is likely to be a common use case for most of the companies, that have custom ACL service of their own. > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709335#comment-15709335 ] Mayuresh Gharat commented on KAFKA-4454: [~jjkoshy] [~ashishsinghdev] [~parth.brahmbhatt] [~ijuma] ping :) > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
[ https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703758#comment-15703758 ] Mayuresh Gharat commented on KAFKA-4454: [~ijuma] [~jjkoshy] [~ashishsinghdev] [~parth.brahmbhatt] would you mind taking a look at this? I will be happy to submit a PR for this. > Authorizer should also include the Principal generated by the > PrincipalBuilder. > --- > > Key: KAFKA-4454 > URL: https://issues.apache.org/jira/browse/KAFKA-4454 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > Currently kafka allows users to plugin a custom PrincipalBuilder and a custom > Authorizer. > The Authorizer.authorize() object takes in a Session object that wraps > KafkaPrincipal and InetAddress. > The KafkaPrincipal currently has a PrincipalType and Principal name, which is > the name of Principal generated by the PrincipalBuilder. > This Principal, generated by the pluggedin PrincipalBuilder might have other > fields that might be required by the pluggedin Authorizer but currently we > loose this information since we only extract the name of Principal while > creating KaflkaPrincipal in SocketServer. > It would be great if KafkaPrincipal has an additional field > "channelPrincipal" which is used to store the Principal generated by the > plugged in PrincipalBuilder. > The pluggedin Authorizer can then use this "channelPrincipal" to do > authorization. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.
Mayuresh Gharat created KAFKA-4454: -- Summary: Authorizer should also include the Principal generated by the PrincipalBuilder. Key: KAFKA-4454 URL: https://issues.apache.org/jira/browse/KAFKA-4454 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.1 Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Fix For: 0.10.2.0 Currently kafka allows users to plugin a custom PrincipalBuilder and a custom Authorizer. The Authorizer.authorize() object takes in a Session object that wraps KafkaPrincipal and InetAddress. The KafkaPrincipal currently has a PrincipalType and Principal name, which is the name of Principal generated by the PrincipalBuilder. This Principal, generated by the pluggedin PrincipalBuilder might have other fields that might be required by the pluggedin Authorizer but currently we loose this information since we only extract the name of Principal while creating KaflkaPrincipal in SocketServer. It would be great if KafkaPrincipal has an additional field "channelPrincipal" which is used to store the Principal generated by the plugged in PrincipalBuilder. The pluggedin Authorizer can then use this "channelPrincipal" to do authorization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4433) Kafka Controller Does not send a LeaderAndIsr to old leader of a topicPartition during reassignment, if the old leader is not a part of the new assigned replicas
Mayuresh Gharat created KAFKA-4433: -- Summary: Kafka Controller Does not send a LeaderAndIsr to old leader of a topicPartition during reassignment, if the old leader is not a part of the new assigned replicas Key: KAFKA-4433 URL: https://issues.apache.org/jira/browse/KAFKA-4433 Project: Kafka Issue Type: Bug Components: controller Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Fix For: 0.10.2.0 Consider the following scenario : old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2} In this case broker 1 does not receive a LeaderAndIsr request to become a follower. This happens because of the following : val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) in PartitionStateMachine.electLeaderForPartition(...) , the replicas returned by ReassignedPartitionLeaderSelector.selectLeader() is only the new Replicas, which are then sent the LeaderAndIsrRequest. So the old replica never receives this LeaderAndIsr. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition
[ https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649485#comment-15649485 ] Mayuresh Gharat commented on KAFKA-4362: I did some more testing while reproducing these error scenarios. There are 2 bugs : 1) OFFSET COMMITS : When we check for MessageFormatVersion, we actually first check if the replica is local for the __consumer_offsets topic and if its not, we return an illegalArgumentException. This results in an Unknown Exception on the client side and commitOffset operation gets an exception. Also as a side effect of this, if you start another consumer in the same group for consuming from the same topic after the rebalance is done and start producing to the topic, you will see that both consumers are consuming the same data. This is because the second consumer that you have started is talking to the right coordinator and the first consumer is completely unaware of the presence of second consumer. 2) CONSUMER REBALANCE : While doing topic reassignment for example moving replicas from (1,2,3 [Leader : 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling this request on server side, we never get rid of the particular partition of __consumer_offsets topic from the coordinators (broker 1) cache. When a handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1) actually has a check if the group is local. But since we have not removed the group from its cache on the earlier stopRepicaRequest from the controller, it does not return NotCoordinatorForGroupException but proceeds with success. So the consumer thinks that its talking to the right coordinator (which is not the case since we moved the coordinator to broker 4 from broker 1). On the consumer side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest to broker 1, which in turn calls the code for checking the MessageFormatVersion on the server. At this point it throws an illegalArgumentException for same reason expalined in point 1) above. This causes the syncGroupRequest to fail with unknown exception in the SyncGroupResponseHandler callback. The exact steps for reproducing these scenarios are as follows : For 1) a) Start 4 kafka brokers and create a topic testtopicA with 1 partition. b) Start a producer producing to a topic testtopicA. c) Start a console consumer with a groupId = testGroupA consuming from testtopicA. d) Produce and consume some data. e) Find the __consumer_offsets partition that stores the offsets for testGroupA. f) Reassign the partitions for the partition form e) such that you remove the leader out of replica lists. g) You should see frequent exceptions on the consumer side, something like this : [2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition testtopicA-0 at offset 14: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) h) If you still produce to the topic, you should be able to see data in the console of this consumer, but its not able to commit offsets. i) Now if you start another console consumer with same groupId = testGroupA consuming from the same topic testtopicA and if you produce more data, you should be able to see the data in both the consumer consoles. For 2) a) Start 4 kafka brokers and create a topic testtopicA with 1 partition. b) Start a producer producing to a topic testtopicA. c) Start a console consumer with a groupId = testGroupA consuming from testtopicA. c) Start another console consumer with a groupId = testGroupA consuming from testtopicA. d) Produce and consume some data. Exactly one of them should be consuming the data. e) Find the __consumer_offsets partition that stores the offsets for testGroupA. f) Reassign the partitions for the partition form e) such that you remove the leader out of replica lists. g) You should see frequent exceptions on the consumer actually fetching data, something like this : [2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition testtopicA-0 at offset 14: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) h) If you kill this consumer, you should immediately see an exception on the other consumer console, something like this : [2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518) at org.apache.kafk
[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition
[ https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644945#comment-15644945 ] Mayuresh Gharat commented on KAFKA-4362: [~jasong35] form the details that Joel has listed, I think there are 2 issues : 1) Offsets commit fail when the Offsets topic partition is moved. This happens because the old coordinator incorrectly returns an iilegalArgumentException when checking for the MessageVersion format, when its infact checking first if the replica is local. So the correct way here would be to return "NotCoordinatorForGroupException" from server side. 2) On client side, right not due to illegalArgumentException thrown by server which is bubbled as UnknownException, the consumer is not able to handle it correctly. I think once we return the correct (NotCoordinatorForGroupException) exception, the consumer should be able to handle it and proceed. > Consumer can fail after reassignment of the offsets topic partition > --- > > Key: KAFKA-4362 > URL: https://issues.apache.org/jira/browse/KAFKA-4362 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Joel Koshy >Assignee: Mayuresh Gharat > > When a consumer offsets topic partition reassignment completes, an offset > commit shows this: > {code} > java.lang.IllegalArgumentException: Message format version for partition 100 > not found > at > kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633) > ~[kafka_2.10.jar:?] > at > kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633) > ~[kafka_2.10.jar:?] > at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?] > at > kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632) > ~[kafka_2.10.jar:?] > at > ... > {code} > The issue is that the replica has been deleted so the > {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this > exception instead which propagates as an unknown error. > Unfortunately consumers don't respond to this and will fail their offset > commits. > One workaround in the above situation is to bounce the cluster - the consumer > will be forced to rediscover the group coordinator. > (Incidentally, the message incorrectly prints the number of partitions > instead of the actual partition.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config
[ https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589938#comment-15589938 ] Mayuresh Gharat commented on KAFKA-3995: If we disable compression, we would not have this issue right? Of course that's not recommended. The other way would be to reduce the linger.ms to be very very low. On thinking more about this I plan to reopen and work on this. Since this is a new config, we would probably require a KIP for this right? If Yes, I can write up a KIP and submit for review. > Add a new configuration "enable.comrpession.ratio.estimation" to the producer > config > > > Key: KAFKA-3995 > URL: https://issues.apache.org/jira/browse/KAFKA-3995 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Jiangjie Qin >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > We recently see a few cases where RecordTooLargeException is thrown because > the compressed message sent by KafkaProducer exceeded the max message size. > The root cause of this issue is because the compressor is estimating the > batch size using an estimated compression ratio based on heuristic > compression ratio statistics. This does not quite work for the traffic with > highly variable compression ratios. > For example, if the batch size is set to 1MB and the max message size is 1MB. > Initially a the producer is sending messages (each message is 1MB) to topic_1 > whose data can be compressed to 1/10 of the original size. After a while the > estimated compression ratio in the compressor will be trained to 1/10 and the > producer would put 10 messages into one batch. Now the producer starts to > send messages (each message is also 1MB) to topic_2 whose message can only be > compress to 1/5 of the original size. The producer would still use 1/10 as > the estimated compression ratio and put 10 messages into a batch. That batch > would be 2 MB after compression which exceeds the maximum message size. In > this case the user do not have many options other than resend everything or > close the producer if they care about ordering. > This is especially an issue for services like MirrorMaker whose producer is > shared by many different topics. > To solve this issue, we can probably add a configuration > "enable.compression.ratio.estimation" to the producer. So when this > configuration is set to false, we stop estimating the compressed size but > will close the batch once the uncompressed bytes in the batch reaches the > batch size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config
[ https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reopened KAFKA-3995: > Add a new configuration "enable.comrpession.ratio.estimation" to the producer > config > > > Key: KAFKA-3995 > URL: https://issues.apache.org/jira/browse/KAFKA-3995 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Jiangjie Qin >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > We recently see a few cases where RecordTooLargeException is thrown because > the compressed message sent by KafkaProducer exceeded the max message size. > The root cause of this issue is because the compressor is estimating the > batch size using an estimated compression ratio based on heuristic > compression ratio statistics. This does not quite work for the traffic with > highly variable compression ratios. > For example, if the batch size is set to 1MB and the max message size is 1MB. > Initially a the producer is sending messages (each message is 1MB) to topic_1 > whose data can be compressed to 1/10 of the original size. After a while the > estimated compression ratio in the compressor will be trained to 1/10 and the > producer would put 10 messages into one batch. Now the producer starts to > send messages (each message is also 1MB) to topic_2 whose message can only be > compress to 1/5 of the original size. The producer would still use 1/10 as > the estimated compression ratio and put 10 messages into a batch. That batch > would be 2 MB after compression which exceeds the maximum message size. In > this case the user do not have many options other than resend everything or > close the producer if they care about ordering. > This is especially an issue for services like MirrorMaker whose producer is > shared by many different topics. > To solve this issue, we can probably add a configuration > "enable.compression.ratio.estimation" to the producer. So when this > configuration is set to false, we stop estimating the compressed size but > will close the batch once the uncompressed bytes in the batch reaches the > batch size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config
[ https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat resolved KAFKA-3995. Resolution: Workaround > Add a new configuration "enable.comrpession.ratio.estimation" to the producer > config > > > Key: KAFKA-3995 > URL: https://issues.apache.org/jira/browse/KAFKA-3995 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Jiangjie Qin >Assignee: Mayuresh Gharat > Fix For: 0.10.2.0 > > > We recently see a few cases where RecordTooLargeException is thrown because > the compressed message sent by KafkaProducer exceeded the max message size. > The root cause of this issue is because the compressor is estimating the > batch size using an estimated compression ratio based on heuristic > compression ratio statistics. This does not quite work for the traffic with > highly variable compression ratios. > For example, if the batch size is set to 1MB and the max message size is 1MB. > Initially a the producer is sending messages (each message is 1MB) to topic_1 > whose data can be compressed to 1/10 of the original size. After a while the > estimated compression ratio in the compressor will be trained to 1/10 and the > producer would put 10 messages into one batch. Now the producer starts to > send messages (each message is also 1MB) to topic_2 whose message can only be > compress to 1/5 of the original size. The producer would still use 1/10 as > the estimated compression ratio and put 10 messages into a batch. That batch > would be 2 MB after compression which exceeds the maximum message size. In > this case the user do not have many options other than resend everything or > close the producer if they care about ordering. > This is especially an issue for services like MirrorMaker whose producer is > shared by many different topics. > To solve this issue, we can probably add a configuration > "enable.compression.ratio.estimation" to the producer. So when this > configuration is set to false, we stop estimating the compressed size but > will close the batch once the uncompressed bytes in the batch reaches the > batch size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4074) Deleting a topic can make it unavailable even if delete.topic.enable is false
[ https://issues.apache.org/jira/browse/KAFKA-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491390#comment-15491390 ] Mayuresh Gharat commented on KAFKA-4074: [~omkreddy] merging your PR with that in https://issues.apache.org/jira/browse/KAFKA-3175 would be great. > Deleting a topic can make it unavailable even if delete.topic.enable is false > - > > Key: KAFKA-4074 > URL: https://issues.apache.org/jira/browse/KAFKA-4074 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Joel Koshy >Assignee: Manikumar Reddy > Fix For: 0.10.1.0 > > > The {{delete.topic.enable}} configuration does not completely block the > effects of delete topic since the controller may (indirectly) query the list > of topics under the delete-topic znode. > To reproduce: > * Delete topic X > * Force a controller move (either by bouncing or removing the controller > znode) > * The new controller will send out UpdateMetadataRequests with leader=-2 for > the partitions of X > * Producers eventually stop producing to that topic > The reason for this is that when ControllerChannelManager adds > UpdateMetadataRequests for brokers, we directly use the partitionsToBeDeleted > field of the DeleteTopicManager (which is set to the partitions of the topics > under the delete-topic znode on controller startup). > In order to get out of the situation you have to remove X from the znode and > then force another controller move. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437288#comment-15437288 ] Mayuresh Gharat commented on KAFKA-3083: [~fpj] do we have an umbrella jira where this issue is been tracked with the changes required to be made that are mentioned in this patch? > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433359#comment-15433359 ] Mayuresh Gharat edited comment on KAFKA-2014 at 8/23/16 6:37 PM: - This is the failure inducer that was developed for kafka : https://github.com/linkedin/simoorg was (Author: mgharat): https://github.com/linkedin/simoorg > Chaos Monkey / Failure Inducer for Kafka > > > Key: KAFKA-2014 > URL: https://issues.apache.org/jira/browse/KAFKA-2014 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Implement a Chaos Monkey for kafka, that will help us catch any shortcomings > in the test environment before going to production. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433359#comment-15433359 ] Mayuresh Gharat commented on KAFKA-2014: https://github.com/linkedin/simoorg > Chaos Monkey / Failure Inducer for Kafka > > > Key: KAFKA-2014 > URL: https://issues.apache.org/jira/browse/KAFKA-2014 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Implement a Chaos Monkey for kafka, that will help us catch any shortcomings > in the test environment before going to production. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2014) Chaos Monkey / Failure Inducer for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat resolved KAFKA-2014. Resolution: Fixed > Chaos Monkey / Failure Inducer for Kafka > > > Key: KAFKA-2014 > URL: https://issues.apache.org/jira/browse/KAFKA-2014 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Implement a Chaos Monkey for kafka, that will help us catch any shortcomings > in the test environment before going to production. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2976) Mirror maker dies if we delete a topic from destination cluster
[ https://issues.apache.org/jira/browse/KAFKA-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat resolved KAFKA-2976. Resolution: Won't Fix > Mirror maker dies if we delete a topic from destination cluster > --- > > Key: KAFKA-2976 > URL: https://issues.apache.org/jira/browse/KAFKA-2976 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > In datapipeline, > 1) Suppose the Mirror Maker is producing to a cluster with Topic T and has > 128 partitions (Partition 0 to Partition 127) . The default setting on > creation of a new topic on that cluster is 8 partitions. > 2) After we delete the topic, the topic gets recreated with 8 partitions > (Partition 0 to Partition 7). > 3) The RecordAccumulator has batches for partitions from 9 to 127. Those > batches get expired and the mirror makers will die to avoid data loss. > We need a way to reassign those batches (batches for Partition 9 top > Partition 127) in the RecordAccumulator to the newly created Topic T with 8 > partitions (Partition 0 to Partition 7). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder
[ https://issues.apache.org/jira/browse/KAFKA-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3722: --- Status: Patch Available (was: In Progress) > PlaintextChannelBuilder should not use > ChannelBuilders.createPrincipalBuilder(configs) for creating instance of > PrincipalBuilder > > > Key: KAFKA-3722 > URL: https://issues.apache.org/jira/browse/KAFKA-3722 > Project: Kafka > Issue Type: Bug >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider this scenario : > 1) We have a Kafka Broker running on PlainText and SSL port simultaneously. > 2) We try to plugin a custom principal builder using the config > "principal.builder.class" for the request coming over the SSL port. > 3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a > config "principal.builder.class" is specified in the passed in configs and > tries to use that even when it is building the instance of PrincipalBuilder > for the PlainText port, when that custom principal class is only menat for > SSL port. > IMO, having a DefaultPrincipalBuilder for PalinText port should be fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder
[ https://issues.apache.org/jira/browse/KAFKA-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3722 started by Mayuresh Gharat. -- > PlaintextChannelBuilder should not use > ChannelBuilders.createPrincipalBuilder(configs) for creating instance of > PrincipalBuilder > > > Key: KAFKA-3722 > URL: https://issues.apache.org/jira/browse/KAFKA-3722 > Project: Kafka > Issue Type: Bug >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider this scenario : > 1) We have a Kafka Broker running on PlainText and SSL port simultaneously. > 2) We try to plugin a custom principal builder using the config > "principal.builder.class" for the request coming over the SSL port. > 3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a > config "principal.builder.class" is specified in the passed in configs and > tries to use that even when it is building the instance of PrincipalBuilder > for the PlainText port, when that custom principal class is only menat for > SSL port. > IMO, having a DefaultPrincipalBuilder for PalinText port should be fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL
[ https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424982#comment-15424982 ] Mayuresh Gharat commented on KAFKA-4050: Just a heads up, this has also been seen as an issue in other systems, for example : https://issues.jenkins-ci.org/browse/JENKINS-20108 > Allow configuration of the PRNG used for SSL > > > Key: KAFKA-4050 > URL: https://issues.apache.org/jira/browse/KAFKA-4050 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.0.1 >Reporter: Todd Palino >Assignee: Todd Palino > Labels: security, ssl > > This change will make the pseudo-random number generator (PRNG) > implementation used by the SSLContext configurable. The configuration is not > required, and the default is to use whatever the default PRNG for the JDK/JRE > is. Providing a string, such as "SHA1PRNG", will cause that specific > SecureRandom implementation to get passed to the SSLContext. > When enabling inter-broker SSL in our certification cluster, we observed > severe performance issues. For reference, this cluster can take up to 600 > MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close > to saturation, and the mirror maker normally produces about 400 MB/sec > (unless it is lagging). When we enabled inter-broker SSL, we saw persistent > replication problems in the cluster at any inbound rate of more than about 6 > or 7 MB/sec per-broker. This was narrowed down to all the network threads > blocking on a single lock in the SecureRandom code. > It turns out that the default PRNG implementation on Linux is NativePRNG. > This uses randomness from /dev/urandom (which, by itself, is a non-blocking > read) and mixes it with randomness from SHA1. The problem is that the entire > application shares a single SecureRandom instance, and NativePRNG has a > global lock within the implNextBytes method. Switching to another > implementation (SHA1PRNG, which has better performance characteristics and is > still considered secure) completely eliminated the bottleneck and allowed the > cluster to work properly at saturation. > The SSLContext initialization has an optional argument to provide a > SecureRandom instance, which the code currently sets to null. This change > creates a new config to specify an implementation, and instantiates that and > passes it to SSLContext if provided. This will also let someone select a > stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3995) Add a new configuration "enable.comrpession.ratio.estimation" to the producer config
[ https://issues.apache.org/jira/browse/KAFKA-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3995: -- Assignee: Mayuresh Gharat > Add a new configuration "enable.comrpession.ratio.estimation" to the producer > config > > > Key: KAFKA-3995 > URL: https://issues.apache.org/jira/browse/KAFKA-3995 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Jiangjie Qin >Assignee: Mayuresh Gharat > Fix For: 0.10.1.0 > > > We recently see a few cases where RecordTooLargeException is thrown because > the compressed message sent by KafkaProducer exceeded the max message size. > The root cause of this issue is because the compressor is estimating the > batch size using an estimated compression ratio based on heuristic > compression ratio statistics. This does not quite work for the traffic with > highly variable compression ratios. > For example, if the batch size is set to 100KB and the max message size is > 1MB. Initially a the producer is sending messages (each message is 100KB) to > topic_1 whose data can be compressed to 1/10 of the original size. After a > while the estimated compression ratio in the compressor will be trained to > 1/10 and the producer would put 10 messages into one batch. Now the producer > starts to send messages (each message is also 100KB) to topic_2 whose message > can only be compress to 1/5 of the original size. The producer would still > use 1/10 as the estimated compression ratio and put 10 messages into a batch. > That batch would be 2 MB after compression which exceeds the maximum message > size. In this case the user do not have many options other than resend > everything or close the producer if they care about ordering. > This is especially an issue for services like MirrorMaker whose producer is > shared by many different topics. > To solve this issue, we can probably add a configuration > "enable.compression.ratio.estimation" to the producer. So when this > configuration is set to false, we stop estimating the compressed size but > will close the batch once the uncompressed bytes in the batch reaches the > batch size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3722) PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder
Mayuresh Gharat created KAFKA-3722: -- Summary: PlaintextChannelBuilder should not use ChannelBuilders.createPrincipalBuilder(configs) for creating instance of PrincipalBuilder Key: KAFKA-3722 URL: https://issues.apache.org/jira/browse/KAFKA-3722 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Consider this scenario : 1) We have a Kafka Broker running on PlainText and SSL port simultaneously. 2) We try to plugin a custom principal builder using the config "principal.builder.class" for the request coming over the SSL port. 3) The ChannelBuilders.createPrincipalBuilder(configs) first checks if a config "principal.builder.class" is specified in the passed in configs and tries to use that even when it is building the instance of PrincipalBuilder for the PlainText port, when that custom principal class is only menat for SSL port. IMO, having a DefaultPrincipalBuilder for PalinText port should be fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3720) Remove BufferExhaustException from doSend() in KafkaProducer
Mayuresh Gharat created KAFKA-3720: -- Summary: Remove BufferExhaustException from doSend() in KafkaProducer Key: KAFKA-3720 URL: https://issues.apache.org/jira/browse/KAFKA-3720 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat KafkaProducer no longer throws BufferExhaustException. We should remove it from the catch clause. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the wait
[ https://issues.apache.org/jira/browse/KAFKA-3651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269937#comment-15269937 ] Mayuresh Gharat commented on KAFKA-3651: [~amandazhu19620...@gmail.com] did you intend to open a PR against this jira? > Whenever the BufferPool throws a "Failed to allocate memory within the > configured max blocking time" exception, it should also remove the condition > object from the waiters deque > - > > Key: KAFKA-3651 > URL: https://issues.apache.org/jira/browse/KAFKA-3651 > Project: Kafka > Issue Type: Bug >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > Fix For: 0.10.0.0 > > > "this.waiters.remove(moreMemory);" should happen before the exception > is thrown. > .Otherwise the waiting thread count will never get to 0 after the exception > and batching will not occur. This is because in the RecordAccumulator.ready > method the exhausted is set as > boolean exhausted = this.free.queued() > 0 where free.queued() returns the > waiters.size(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the wait
[ https://issues.apache.org/jira/browse/KAFKA-3651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269837#comment-15269837 ] Mayuresh Gharat commented on KAFKA-3651: [~ijuma] Yes just working on the unit test. > Whenever the BufferPool throws a "Failed to allocate memory within the > configured max blocking time" exception, it should also remove the condition > object from the waiters deque > - > > Key: KAFKA-3651 > URL: https://issues.apache.org/jira/browse/KAFKA-3651 > Project: Kafka > Issue Type: Bug >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > "this.waiters.remove(moreMemory);" should happen before the exception > is thrown. > .Otherwise the waiting thread count will never get to 0 after the exception > and batching will not occur. This is because in the RecordAccumulator.ready > method the exhausted is set as > boolean exhausted = this.free.queued() > 0 where free.queued() returns the > waiters.size(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3651) Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" excepion, it should also remove the condition object from the waiters
Mayuresh Gharat created KAFKA-3651: -- Summary: Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" excepion, it should also remove the condition object from the waiters deque Key: KAFKA-3651 URL: https://issues.apache.org/jira/browse/KAFKA-3651 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat "this.waiters.remove(moreMemory);" should happen before the exception is thrown. .Otherwise the waiting thread count will never get to 0 after the exception and batching will not occur. This is because in the RecordAccumulator.ready method the exhausted is set as boolean exhausted = this.free.queued() > 0 where free.queued() returns the waiters.size(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3294) Kafka REST API
[ https://issues.apache.org/jira/browse/KAFKA-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258891#comment-15258891 ] Mayuresh Gharat commented on KAFKA-3294: [~sriharsha.sm] so this will be integrated with kafka brokers right? So I am trying to understand is that this will be something like mirror maker? > Kafka REST API > -- > > Key: KAFKA-3294 > URL: https://issues.apache.org/jira/browse/KAFKA-3294 > Project: Kafka > Issue Type: New Feature >Reporter: Sriharsha Chintalapani >Assignee: Parth Brahmbhatt > > This JIRA is to build Kafka REST API for producer, consumer and also any > administrative tasks such as create topic, delete topic. We do have lot of > kafka client api support in different languages but having REST API for > producer and consumer will make it easier for users to read or write Kafka. > Also having administrative API will help in managing a cluster or building > administrative dashboards. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3294) Kafka REST API
[ https://issues.apache.org/jira/browse/KAFKA-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258438#comment-15258438 ] Mayuresh Gharat commented on KAFKA-3294: Are you thinking of exposing a rest endpoint on the broker itself? > Kafka REST API > -- > > Key: KAFKA-3294 > URL: https://issues.apache.org/jira/browse/KAFKA-3294 > Project: Kafka > Issue Type: New Feature >Reporter: Sriharsha Chintalapani >Assignee: Parth Brahmbhatt > > This JIRA is to build Kafka REST API for producer, consumer and also any > administrative tasks such as create topic, delete topic. We do have lot of > kafka client api support in different languages but having REST API for > producer and consumer will make it easier for users to read or write Kafka. > Also having administrative API will help in managing a cluster or building > administrative dashboards. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition
[ https://issues.apache.org/jira/browse/KAFKA-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195593#comment-15195593 ] Mayuresh Gharat commented on KAFKA-3390: Do you mean that even after the topic got completely removed the replicaManager on the leader broker kept on trying to shrink the ISR. Am I understanding it correctly? -Mayuresh > ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition > - > > Key: KAFKA-3390 > URL: https://issues.apache.org/jira/browse/KAFKA-3390 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Stevo Slavic >Assignee: Mayuresh Gharat > > For a topic whose deletion has been requested, Kafka replica manager may end > up infinitely trying and failing to shrink ISR. > Here is fragment from server.log where this recurring and never ending > condition has been noticed: > {noformat} > [2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:13,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:23,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:33,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > ... > {noformat} > Before topic deletion was requested, this was state in ZK of its sole > partition: > {noformat} > Zxid: 0x181045 > Cxid: 0xc92 > Client id:0x3532dd88fd2 > Time: Mon Feb 29 16:46:23 CET 2016 > Operation:setData > Path: /brokers/topics/foo/partitions/0/state > Data: > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]} > Version: 68 > {noformat} > Topic (sole partition) had no data ever published to it. I guess at some > point after topic deletion has been requested, partition state first got > updated and this was updated state: > {noformat} > Zxid: 0x18b0be > Cxid: 0x141e4 > Client id:0x3532dd88fd2 > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:setData > Path: /brokers/topics/foo/partitions/0/state > Data: > {"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]} > Version: 69 > {noformat} > For whatever reason replica manager (some cache it uses, I guess > ReplicaManager.allPartitions) never sees this update, nor does it see that > the partition state, partition, partitions node and finally topic node got > deleted: > {noformat} > Zxid: 0x18b0bf > Cxid: 0x40fb > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:delete > Path: /brokers/topics/foo/partitions/0/state > --- > Zxid: 0x18b0c0 > Cxid: 0x40fe > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:delete > Path: /brokers/topics/foo/partitions/0 > --- > Zxid: 0x18b0c1 > Cxid: 0x4100 > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016
[jira] [Assigned] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition
[ https://issues.apache.org/jira/browse/KAFKA-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3390: -- Assignee: Mayuresh Gharat > ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition > - > > Key: KAFKA-3390 > URL: https://issues.apache.org/jira/browse/KAFKA-3390 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Stevo Slavic >Assignee: Mayuresh Gharat > > For a topic whose deletion has been requested, Kafka replica manager may end > up infinitely trying and failing to shrink ISR. > Here is fragment from server.log where this recurring and never ending > condition has been noticed: > {noformat} > [2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:13,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:23,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR > for partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition) > [2016-03-04 09:42:33,897] WARN Conditional update of path > /brokers/topics/foo/partitions/0/state with data > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} > and expected version 68 failed due to > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils) > [2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached > zkVersion [68] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > ... > {noformat} > Before topic deletion was requested, this was state in ZK of its sole > partition: > {noformat} > Zxid: 0x181045 > Cxid: 0xc92 > Client id:0x3532dd88fd2 > Time: Mon Feb 29 16:46:23 CET 2016 > Operation:setData > Path: /brokers/topics/foo/partitions/0/state > Data: > {"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]} > Version: 68 > {noformat} > Topic (sole partition) had no data ever published to it. I guess at some > point after topic deletion has been requested, partition state first got > updated and this was updated state: > {noformat} > Zxid: 0x18b0be > Cxid: 0x141e4 > Client id:0x3532dd88fd2 > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:setData > Path: /brokers/topics/foo/partitions/0/state > Data: > {"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]} > Version: 69 > {noformat} > For whatever reason replica manager (some cache it uses, I guess > ReplicaManager.allPartitions) never sees this update, nor does it see that > the partition state, partition, partitions node and finally topic node got > deleted: > {noformat} > Zxid: 0x18b0bf > Cxid: 0x40fb > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:delete > Path: /brokers/topics/foo/partitions/0/state > --- > Zxid: 0x18b0c0 > Cxid: 0x40fe > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:delete > Path: /brokers/topics/foo/partitions/0 > --- > Zxid: 0x18b0c1 > Cxid: 0x4100 > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2016 > Operation:delete > Path: /brokers/topics/foo/partitions > --- > Zxid: 0x18b0c2 > Cxid: 0x4102 > Client id:0x3532dd88fd2000a > Time: Fri Mar 04 9:41:52 CET 2
[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.
[ https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193448#comment-15193448 ] Mayuresh Gharat commented on KAFKA-3388: If the batch is retried and re-enqueued, the last append time gets updated and that is used to time out the batch. So it should not expire immediately. > Producer should only timeout a batch in the accumulator when metadata is > missing. > - > > Key: KAFKA-3388 > URL: https://issues.apache.org/jira/browse/KAFKA-3388 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Blocker > Fix For: 0.10.0.0 > > > In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the > accumulator. We were intended to avoid the case that the batches sitting in > the accumulator forever when topic metadata is missing. > Currently we are not checking if metadata is available or not when we timeout > the batches in the accumulator (although the comments says we will check the > metadata). This causes problem that once the previous batch hit a request > timeout and got retried, all the subsequent batches will fail with timeout > exception. We should only timeout the batches in the accumulator when the > metadata of the partition is missing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka
[ https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15190206#comment-15190206 ] Mayuresh Gharat commented on KAFKA-3367: The client that are producing to the kafka clusters should be stopped before you delete the topic. This is because when the topic gets deleted, the producer is going to issue TopicMetadata request and since the automatic topic creation is turned ON, it will recreate the topic. > Delete topic dont delete the complete log from kafka > > > Key: KAFKA-3367 > URL: https://issues.apache.org/jira/browse/KAFKA-3367 > Project: Kafka > Issue Type: Improvement >Reporter: Akshath Patkar > > Delete topic Just marks the topic as deleted. But data still remain in logs. > How can we delete the topic completely with out doing manual delete of logs > from kafka and zookeeper -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka
[ https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15189933#comment-15189933 ] Mayuresh Gharat commented on KAFKA-3367: Is automatic topic creation turned ON in your kafka cluster? If yes, did you stop the clients that are consuming/producing to this kafka cluster before you marked the topic for delete in zookeeper? > Delete topic dont delete the complete log from kafka > > > Key: KAFKA-3367 > URL: https://issues.apache.org/jira/browse/KAFKA-3367 > Project: Kafka > Issue Type: Improvement >Reporter: Akshath Patkar > > Delete topic Just marks the topic as deleted. But data still remain in logs. > How can we delete the topic completely with out doing manual delete of logs > from kafka and zookeeper -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3367) Delete topic dont delete the complete log from kafka
[ https://issues.apache.org/jira/browse/KAFKA-3367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15189676#comment-15189676 ] Mayuresh Gharat commented on KAFKA-3367: It takes time to delete the logs, depending on the amount of data the topic has. Also when the topic is marked for deletion, the controller has a listener that fires and starts deleting the topic. We have tested this in our environment at Linkedin and it does delete the logs. What version of Kafka are you running? > Delete topic dont delete the complete log from kafka > > > Key: KAFKA-3367 > URL: https://issues.apache.org/jira/browse/KAFKA-3367 > Project: Kafka > Issue Type: Improvement >Reporter: Akshath Patkar > > Delete topic Just marks the topic as deleted. But data still remain in logs. > How can we delete the topic completely with out doing manual delete of logs > from kafka and zookeeper -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3175 started by Mayuresh Gharat. -- > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > Fix For: 0.9.0.1 > > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3175: --- Status: Patch Available (was: In Progress) > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > Fix For: 0.9.0.1 > > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126711#comment-15126711 ] Mayuresh Gharat commented on KAFKA-3175: I am thinking that once we detect that the delete topic is disabled, onController restart/shift we can remove the entries under /admin/delete_topic/. > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > Fix For: 0.9.0.1 > > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3181) Check occurrences of Runtime.halt() in the codebase and try to handle it at a single place
[ https://issues.apache.org/jira/browse/KAFKA-3181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3181: --- Description: Currently runtime.halt() is called at multiple places in the code. In an ideal case, we would throw an appropriate exception from classes and halt the runtime in a centralised place somewhere (so that we could override the behaviour during tests). For example : Right now test suit would just die halfway due to a halt call in ReplicaManager which, in turn, was due to an exception being thrown from this class (https://issues.apache.org/jira/browse/KAFKA-3063). > Check occurrences of Runtime.halt() in the codebase and try to handle it at a > single place > -- > > Key: KAFKA-3181 > URL: https://issues.apache.org/jira/browse/KAFKA-3181 > Project: Kafka > Issue Type: Bug >Reporter: Mayuresh Gharat > > Currently runtime.halt() is called at multiple places in the code. In an > ideal case, we would throw an appropriate exception from classes and halt the > runtime in a centralised place somewhere (so that we could override the > behaviour during tests). For example : Right now test suit would just die > halfway due to a halt call in ReplicaManager which, in turn, was due to an > exception being thrown from this class > (https://issues.apache.org/jira/browse/KAFKA-3063). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3181) Check occurrences of Runtime.halt() in the codebase and try to handle it at a single place
Mayuresh Gharat created KAFKA-3181: -- Summary: Check occurrences of Runtime.halt() in the codebase and try to handle it at a single place Key: KAFKA-3181 URL: https://issues.apache.org/jira/browse/KAFKA-3181 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124681#comment-15124681 ] Mayuresh Gharat edited comment on KAFKA-3175 at 1/30/16 3:05 AM: - I am just thinking we have that check in the TopicDeletionManager.start() and if the delete topic enable is set to false, DeleteTopicThread should not start. was (Author: mgharat): I am just thinking we have that check in the TopicDeletionManager.start() and if the delete topic enable is set to false, DeleteTopicThread should not start. > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124681#comment-15124681 ] Mayuresh Gharat commented on KAFKA-3175: I am just thinking we have that check in the TopicDeletionManager.start() and if the delete topic enable is set to false, DeleteTopicThread should not start. > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3175: -- Assignee: Mayuresh Gharat > topic not accessible after deletion even when delete.topic.enable is disabled > - > > Key: KAFKA-3175 > URL: https://issues.apache.org/jira/browse/KAFKA-3175 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The can be reproduced with the following steps. > 1. start ZK and 1 broker (with default delete.topic.enable=false) > 2. create a topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --partition 1 --replication-factor 1 > 3. delete topic test > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test > 4. restart the broker > Now topic test still shows up during topic description. > bin/kafka-topics.sh --zookeeper localhost:2181 --describe > Topic:testPartitionCount:1ReplicationFactor:1 Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > However, one can't produce to this topic any more. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation > id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation > id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation > id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15120014#comment-15120014 ] Mayuresh Gharat commented on KAFKA-3147: [~becket_qin] sure. I will take a look and upload a PR. > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan >Assignee: Mayuresh Gharat > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3147: -- Assignee: Mayuresh Gharat > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan >Assignee: Mayuresh Gharat > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118482#comment-15118482 ] Mayuresh Gharat edited comment on KAFKA-3147 at 1/27/16 2:45 AM: - [~junrao] I think abortIncompleteBatches was not a part of KIP-19 patch. I will surely like to look in to this though. was (Author: mgharat): yep, will take a look at this. > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3147: --- Comment: was deleted (was: [~junrao] the abortExpiredBatches does get rid of the the RecordBatch from the queue : if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { expiredBatches.add(batch); count++; batchIterator.remove(); deallocate(batch); }) > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118496#comment-15118496 ] Mayuresh Gharat commented on KAFKA-3147: [~junrao] the abortExpiredBatches does get rid of the the RecordBatch from the queue : if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { expiredBatches.add(batch); count++; batchIterator.remove(); deallocate(batch); } > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3147) Memory records is not writable in MirrorMaker
[ https://issues.apache.org/jira/browse/KAFKA-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118482#comment-15118482 ] Mayuresh Gharat commented on KAFKA-3147: yep, will take a look at this. > Memory records is not writable in MirrorMaker > - > > Key: KAFKA-3147 > URL: https://issues.apache.org/jira/browse/KAFKA-3147 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Meghana Narasimhan > > Hi, > We are running a 3 node cluster (kafka version 0.9) and Node 0 also has a few > mirror makers running. > When we do a rolling restart of the cluster, the mirror maker shuts down with > the following errors. > [2016-01-11 20:16:00,348] WARN Got error produce response with correlation id > 12491674 on topic-partition test-99, retrying (2147483646 attempts left). > Error: NOT_LEADER_FOR_PARTITION > (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:00,853] FATAL [mirrormaker-thread-0] Mirror maker thread > failure due to (kafka.tools.MirrorMaker$MirrorMakerThread) > java.lang.IllegalStateException: Memory records is not writable > at > org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) > at > org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) > at > kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:593) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at > kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:398) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:398) > [2016-01-11 20:16:01,072] WARN Got error produce response with correlation id > 12491679 on topic-partition test-75, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-93, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:01,073] WARN Got error produce response with correlation id > 12491679 on topic-partition test-24, retrying (2147483646 attempts left). > Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2016-01-11 20:16:20,479] FATAL [mirrormaker-thread-0] Mirror maker thread > exited abnormally, stopping the whole mirror maker. > (kafka.tools.MirrorMaker$MirrorMakerThread) > Curious if the NOT_LEADER_FOR_PARTITION is because of a potential bug hinted > at in the thread , > http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3ccajs3ho_u8s1xou_kudnfjamypjtmrjlw10qvkngn2yqkdan...@mail.gmail.com%3E > > And I think the mirror maker shuts down because of the > "abort.on.send.failure" which is set to true in our case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116436#comment-15116436 ] Mayuresh Gharat commented on KAFKA-3126: It does look like a problem with consistency. It might not be fatal though. I came across this behavior while debugging some other issue. > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3126: --- Assignee: (was: Mayuresh Gharat) > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1796) Sanity check partition command line tools
[ https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat resolved KAFKA-1796. Resolution: Not A Problem > Sanity check partition command line tools > - > > Key: KAFKA-1796 > URL: https://issues.apache.org/jira/browse/KAFKA-1796 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Mayuresh Gharat > Labels: newbie > > We need to sanity check the input json has the valid values before triggering > the admin process. For example, we have seen a scenario where the json input > for partition reassignment tools have partition replica info as {broker-1, > broker-1, broker-2} and it is still accepted in ZK and eventually lead to > under replicated count, etc. This is partially because we use a Map rather > than a Set reading the json input for this case; but in general we need to > make sure the input parameters like Json needs to be valid before writing it > to ZK. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1860) File system errors are not detected unless Kafka tries to write
[ https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15113128#comment-15113128 ] Mayuresh Gharat commented on KAFKA-1860: [~guozhang] can you take another look at the PR? I have included most of the comments on the PR. > File system errors are not detected unless Kafka tries to write > --- > > Key: KAFKA-1860 > URL: https://issues.apache.org/jira/browse/KAFKA-1860 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Mayuresh Gharat > Fix For: 0.10.0.0 > > Attachments: KAFKA-1860.patch > > > When the disk (raid with caches dir) dies on a Kafka broker, typically the > filesystem gets mounted into read-only mode, and hence when Kafka tries to > read the disk, they'll get a FileNotFoundException with the read-only errno > set (EROFS). > However, as long as there is no produce request received, hence no writes > attempted on the disks, Kafka will not exit on such FATAL error (when the > disk starts working again, Kafka might think some files are gone while they > will reappear later as raid comes back online). Instead it keeps spilling > exceptions like: > {code} > 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] > [kafka-server] [] Uncaught exception in scheduled task > 'kafka-recovery-point-checkpoint' > java.io.FileNotFoundException: > /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp > (Read-only file system) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:206) > at java.io.FileOutputStream.(FileOutputStream.java:156) > at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15112795#comment-15112795 ] Mayuresh Gharat commented on KAFKA-3126: My confusion as described in this jira was : (2) and (3) follow the same code path and similarly (5) and (6). So ideally (5) and (6) should say the same thing that "Cannot remove replica A from ISR of partition.." > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111611#comment-15111611 ] Mayuresh Gharat commented on KAFKA-3126: Can you explain the steps in further detail : 2. Controller B update ISR of partition p from [A, C] to [C] -> Update in Zookeeper you mean ? 3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C expands ISR in zookeeper ? 4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR {A,C} propagated to B ? 5. When Broker A actually shuts down, Controller B will see A in the ISR. ---> If {A,C} is propagated to B, why would it only see {A} > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111611#comment-15111611 ] Mayuresh Gharat edited comment on KAFKA-3126 at 1/21/16 11:36 PM: -- I had some questions : 2. Controller B update ISR of partition p from [A, C] to [C] -> Update in Zookeeper you mean ? 3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C expands ISR in zookeeper ? 4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR {A,C} propagated to B ? 5. When Broker A actually shuts down, Controller B will see A in the ISR. ---> If {A,C} is propagated to B, why would it only see {A} was (Author: mgharat): Can you explain the steps in further detail : 2. Controller B update ISR of partition p from [A, C] to [C] -> Update in Zookeeper you mean ? 3. Before the LeaderAndIsrRequest reflecting the change in (2) reaches broker C, broker C expands leader and ISR from [A] to [A, C]. ---> Broker C expands ISR in zookeeper ? 4. The ISR change in 3 was propagated to controller B. --> Do you mean ISR {A,C} propagated to B ? 5. When Broker A actually shuts down, Controller B will see A in the ISR. ---> If {A,C} is propagated to B, why would it only see {A} > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111353#comment-15111353 ] Mayuresh Gharat commented on KAFKA-3126: I don't see why Controller B will see A in the ISR in the final step. > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3126: -- Assignee: Mayuresh Gharat > Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr > in zookeeper is not updated during controlled shutdown. > --- > > Key: KAFKA-3126 > URL: https://issues.apache.org/jira/browse/KAFKA-3126 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat > > Consider Broker B is controller, broker A is undergoing shutdown. > 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down > broker A > 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic1,Partition=1,Replica=A] ---> (1) > 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} > --> (2) > 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=1,Replica=A] ---> (3) > 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} > -> (4) > 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure > callback for A > 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on > Controller B]: Invoking state change to OfflinePartition for partitions > 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on > controller B]: Invoking state change to OfflineReplica for replicas > [Topic=testTopic2,Partition=0,Replica=A], > [Topic=__consumer_offsets,Partition=5,Replica=A], > [Topic=testTopic1,Partition=2,Replica=A], > [Topic=__consumer_offsets,Partition=96,Replica=A], > [Topic=testTopic2,Partition=1,Replica=A], > [Topic=__consumer_offsets,Partition=36,Replica=A], > [Topic=testTopic1,Partition=4,Replica=A], > [Topic=__consumer_offsets,Partition=85,Replica=A], > [Topic=testTopic1,Partition=6,Replica=A], > [Topic=testTopic1,Partition=1,Replica=A] > 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR > for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} > --> (5) > 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove > replica A from ISR of partition [testTopic1,1] since it is not in the ISR. > Leader = D ; ISR = List(D) --> (6) > If after (1) and (2) controller gets rid of the replica A from the ISR in > zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same > for [testTopic2-1] as per (5) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3126: --- Description: Consider Broker B is controller, broker A is undergoing shutdown. 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic1,Partition=1,Replica=A] ---> (1) 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> (2) 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic2,Partition=1,Replica=A] ---> (3) 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> (4) 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure callback for A 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on Controller B]: Invoking state change to OfflinePartition for partitions 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic2,Partition=0,Replica=A], [Topic=__consumer_offsets,Partition=5,Replica=A], [Topic=testTopic1,Partition=2,Replica=A], [Topic=__consumer_offsets,Partition=96,Replica=A], [Topic=testTopic2,Partition=1,Replica=A], [Topic=__consumer_offsets,Partition=36,Replica=A], [Topic=testTopic1,Partition=4,Replica=A], [Topic=__consumer_offsets,Partition=85,Replica=A], [Topic=testTopic1,Partition=6,Replica=A], [Topic=testTopic1,Partition=1,Replica=A] 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} --> (5) 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove replica A from ISR of partition [testTopic1,1] since it is not in the ISR. Leader = D ; ISR = List(D) --> (6) If after (1) and (2) controller gets rid of the replica A from the ISR in zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the same for [testTopic2-1] as per (5) was: Consider Broker B is controller, broker A is undergoing shutdown. 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic1,Partition=1,Replica=A] ---> (1) 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> (2) 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic2,Partition=1,Replica=A] ---> (3) 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> (4) 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure callback for A 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on Controller B]: Invoking state change to OfflinePartition for partitions 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=__consumer_offsets,Partition=59,Replica=A],[Topic=__consumer_offsets,Partition=81,Replica=A],[Topic=__consumer_offsets,Partition=61,Replica=A],[Topic=testTopic2,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=5,Replica=A],[Topic=__consumer_offsets,Partition=89,Replica=A],[Topic=__consumer_offsets,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=22,Replica=A],[Topic=__consumer_offsets,Partition=60,Replica=A],[Topic=testTopic1,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=96,Replica=A],[Topic=testTopic2,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=36,Replica=A],[Topic=__consumer_offsets,Partition=10,Replica=A],[Topic=__consumer_offsets,Partition=69,Replica=A],[Topic=__consumer_offsets,Partition=43,Replica=A],[Topic=__consumer_offsets,Partition=57,Replica=A],[Topic=__consumer_offsets,Partition=21,Replica=A],[Topic=__consumer_offsets,Partition=46,Replica=A],[Topic=__consumer_offsets,Partition=83,Replica=A],[Topic=__consumer_offsets,Partition=17,Replica=A],[Topic=__consumer_offsets,Partition=73,Replica=A],[Topic=__consumer_offsets,Partition=58,Replica=A],[Topic=__consumer_offsets,Partition=26,Replica=A],[Topic=__consumer_offsets,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=70,Repli
[jira] [Created] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.
Mayuresh Gharat created KAFKA-3126: -- Summary: Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown. Key: KAFKA-3126 URL: https://issues.apache.org/jira/browse/KAFKA-3126 Project: Kafka Issue Type: Bug Components: core Reporter: Mayuresh Gharat Consider Broker B is controller, broker A is undergoing shutdown. 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down broker A 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic1,Partition=1,Replica=A] ---> (1) 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} --> (2) 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=testTopic2,Partition=1,Replica=A] ---> (3) 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} -> (4) 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure callback for A 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on Controller B]: Invoking state change to OfflinePartition for partitions 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on controller B]: Invoking state change to OfflineReplica for replicas [Topic=__consumer_offsets,Partition=59,Replica=A],[Topic=__consumer_offsets,Partition=81,Replica=A],[Topic=__consumer_offsets,Partition=61,Replica=A],[Topic=testTopic2,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=5,Replica=A],[Topic=__consumer_offsets,Partition=89,Replica=A],[Topic=__consumer_offsets,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=22,Replica=A],[Topic=__consumer_offsets,Partition=60,Replica=A],[Topic=testTopic1,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=96,Replica=A],[Topic=testTopic2,Partition=1,Replica=A],[Topic=__consumer_offsets,Partition=36,Replica=A],[Topic=__consumer_offsets,Partition=10,Replica=A],[Topic=__consumer_offsets,Partition=69,Replica=A],[Topic=__consumer_offsets,Partition=43,Replica=A],[Topic=__consumer_offsets,Partition=57,Replica=A],[Topic=__consumer_offsets,Partition=21,Replica=A],[Topic=__consumer_offsets,Partition=46,Replica=A],[Topic=__consumer_offsets,Partition=83,Replica=A],[Topic=__consumer_offsets,Partition=17,Replica=A],[Topic=__consumer_offsets,Partition=73,Replica=A],[Topic=__consumer_offsets,Partition=58,Replica=A],[Topic=__consumer_offsets,Partition=26,Replica=A],[Topic=__consumer_offsets,Partition=2,Replica=A],[Topic=__consumer_offsets,Partition=70,Replica=A],[Topic=__consumer_offsets,Partition=13,Replica=A],[Topic=__consumer_offsets,Partition=62,Replica=A],[Topic=__consumer_offsets,Partition=25,Replica=A],[Topic=__consumer_offsets,Partition=9,Replica=A],[Topic=__consumer_offsets,Partition=29,Replica=A],[Topic=__consumer_offsets,Partition=97,Replica=A],[Topic=__consumer_offsets,Partition=53,Replica=A],[Topic=__consumer_offsets,Partition=77,Replica=A],[Topic=__consumer_offsets,Partition=40,Replica=A],[Topic=__consumer_offsets,Partition=52,Replica=A],[Topic=__consumer_offsets,Partition=50,Replica=A],[Topic=__consumer_offsets,Partition=64,Replica=A],[Topic=__consumer_offsets,Partition=23,Replica=A],[Topic=__consumer_offsets,Partition=55,Replica=A],[Topic=__consumer_offsets,Partition=93,Replica=A],[Topic=__consumer_offsets,Partition=12,Replica=A],[Topic=__consumer_offsets,Partition=16,Replica=A],[Topic=testTopic1,Partition=4,Replica=A],[Topic=__consumer_offsets,Partition=38,Replica=A],[Topic=__consumer_offsets,Partition=65,Replica=A],[Topic=__consumer_offsets,Partition=95,Replica=A],[Topic=__consumer_offsets,Partition=31,Replica=A],[Topic=__consumer_offsets,Partition=88,Replica=A],[Topic=__consumer_offsets,Partition=19,Replica=A],[Topic=__consumer_offsets,Partition=98,Replica=A],[Topic=__consumer_offsets,Partition=45,Replica=A],[Topic=__consumer_offsets,Partition=79,Replica=A],[Topic=__consumer_offsets,Partition=94,Replica=A],[Topic=__consumer_offsets,Partition=4,Replica=A],[Topic=__consumer_offsets,Partition=91,Replica=A],[Topic=__consumer_offsets,Partition=86,Replica=A],[Topic=__consumer_offsets,Partition=0,Replica=A],[Topic=__consumer_offsets,Partition=85,Replica=A],[Topic=testTopic1,Partition=6,Replica=A],[Topic=__consumer_offsets,Partition=24,Replica=A],[Topic=__consumer_offsets,Partition=72,Replica=A],[Topic=__consumer_offsets,Partition=37,Replica=A],[Topic=__consumer_offsets,Partition=82,Replica=A],[Topic=__consumer_offsets,Partition=76,Replica=A],[Topic=__consumer_offsets,Partition=7,Replica=A],[Topic=__consumer_offsets,Partition=74,R
[jira] [Commented] (KAFKA-1397) delete topic is not working
[ https://issues.apache.org/jira/browse/KAFKA-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15109040#comment-15109040 ] Mayuresh Gharat commented on KAFKA-1397: Can you paste the server side logs? > delete topic is not working > > > Key: KAFKA-1397 > URL: https://issues.apache.org/jira/browse/KAFKA-1397 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Timothy Chen > Fix For: 0.8.2.0 > > Attachments: KAFKA-1397.patch, KAFKA-1397_2014-04-28_14:48:32.patch, > KAFKA-1397_2014-04-28_17:08:49.patch, KAFKA-1397_2014-04-30_14:55:28.patch, > KAFKA-1397_2014-05-01_15:53:57.patch, KAFKA-1397_2014-05-01_18:12:24.patch, > KAFKA-1397_2014-05-02_13:38:02.patch, KAFKA-1397_2014-05-05_11:17:59.patch, > KAFKA-1397_2014-05-05_14:00:29.patch > > > All unit tests are disabled since they hang transiently (see details in > KAFKA-1391). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15107770#comment-15107770 ] Mayuresh Gharat commented on KAFKA-3083: However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up? ---> That seems right, it should be in the cache of controller B and should be elected as leader. If however B goes down before this and a new controller is elected, it will read the data from zookeeper and might not be able to elect a new leader if uncleanLeaderElection is turned OFF. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15107417#comment-15107417 ] Mayuresh Gharat commented on KAFKA-3083: Hi [~fpj], Correct me if I am wrong : 1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. 2) The race condition that [~junrao] mentioned still exist above in 1). 3) To overcome this we somehow need to detect that the broker A who was the controller got a session expiration and should drop all the zk work its doing immediately. 4) To do step 3), as [~junrao] suggested we have to detect the connection loss event. Now 2 things might happen : i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired. ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations. The only difference between i) and ii) is SessionExpiration check. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102912#comment-15102912 ] Mayuresh Gharat commented on KAFKA-3083: Hi Jun, I was talking about the SessionExpirationListener that implements the IZkStateListener in KafkaController that has the handleStateChanged() api. I was thinking if we can handle the statechange to Disconnected in that callback, to do the cleanup. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102887#comment-15102887 ] Mayuresh Gharat commented on KAFKA-3083: That's right. I am thinking if there is a way to check the ConnectionLoss using handleStateChanged() api on the SessionExpirationListener which can be use to drop all the current zk task that controller A was about to do. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102798#comment-15102798 ] Mayuresh Gharat commented on KAFKA-3083: on a side note, I noticed a new issue in controlled shutdown that is kind of similar. I will open a new Jira for that. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102791#comment-15102791 ] Mayuresh Gharat commented on KAFKA-3083: [~junrao], [~fpj] that makes it more clear. I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper. Again there is a race condition, wherein Broker A's session timesOut at time T. Broker B becomes the controller at T+2. Broker A can still proceed with the changes to ZK between T and T+2. I had some questions on the zookeeper session expiry: 1) If suppose a broker establishes a connection with zookeeper and has a ZookeeperSessionTimeout set to 10 min.The broker goes down/or is stuck and comes back up and connects to zookeeper after 5 min, will it connect on the same session? 2) The session Expiry is only invoked on SessionTimeout and nothing else. Am I right? > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102259#comment-15102259 ] Mayuresh Gharat commented on KAFKA-3083: For your question regarding : If broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK? C will not know about ISR stored in ZK until next LeaderAndISR from the new controller. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098607#comment-15098607 ] Mayuresh Gharat edited comment on KAFKA-3083 at 1/14/16 6:34 PM: - That's a very good point, I will verify if this can happen. Moreover, I think the behavior should be : 1) Broker A was the controller. 2) Broker A faces a session expiration, invokes the controllerResignation and clears all its caches and also stops all the ongoing controller work. 3) Broker B becomes the controller and proceeds. what do you think? was (Author: mgharat): That's a very good point, I will verify if this can happen. Moreover, I think the behavior should be : 1) Broker A was the controller. 2) Broker A faces a session expiration, invokes the controllerResignation and clears all its caches and also stops all the ongoing controller work. 3) Broker B becomes the controller and proceeds. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098607#comment-15098607 ] Mayuresh Gharat commented on KAFKA-3083: That's a very good point, I will verify if this can happen. Moreover, I think the behavior should be : 1) Broker A was the controller. 2) Broker A faces a session expiration, invokes the controllerResignation and clears all its caches and also stops all the ongoing controller work. 3) Broker B becomes the controller and proceeds. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-3083: -- Assignee: Mayuresh Gharat > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side
[ https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089686#comment-15089686 ] Mayuresh Gharat edited comment on KAFKA-2410 at 1/8/16 6:48 PM: I would like to bump up discussion on this. The way I understand this is that we have 2 points : 1) Have a config on the producer and consumer. -> TMR is disabled on the broker side. -> Producer and consumer have this config set to TRUE. -> The producer does the send(), it issues a TMR and finds that the topic does not exist. It will check this config and issue a createTopicRequest. When the createTopicRequest returns success, it will start producing the actual data. -> Same goes for Consumer, if the topic does not exist, consumer will create it if the config is enabled. This is not a hard requirement on the consumer side. 2) No config and the topic creation is left to user explicitly. The problem with 2) is that, if we have mirror makers running, we have to do the topic creation in producer, in some sort of callback may be, but it would be better to go with 1) in this scenario. + Joel [~jjkoshy] was (Author: mgharat): I would like to bump up discussion on this. The way I understand this is that we have 2 points : 1) Have a config on the producer and consumer. a) TMR is disabled on the broker side. b) Producer and consumer have this config set to TRUE. c) The producer does the send(), it issues a TMR and finds that the topic does not exist. It will check this config and issue a createTopicRequest. When the createTopicRequest returns success, it will start producing the actual data. d) Same goes for Consumer, if the topic does not exist, consumer will create it if the config is enabled. This is not a hard requirement on the consumer side. 2) No config and the topic creation is left to user explicitly. The problem with 2) is that, if we have mirror makers running, we have to do the topic creation in producer, in some sort of callback may be, but it would be better to go with 1) in this scenario. + Joel [~jjkoshy] > Implement "Auto Topic Creation" client side and remove support from Broker > side > --- > > Key: KAFKA-2410 > URL: https://issues.apache.org/jira/browse/KAFKA-2410 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.8.2.1 >Reporter: Grant Henke >Assignee: Grant Henke > > Auto topic creation on the broker has caused pain in the past; And today it > still causes unusual error handling requirements on the client side, added > complexity in the broker, mixed responsibility of the TopicMetadataRequest, > and limits configuration of the option to be cluster wide. In the future > having it broker side will also make features such as authorization very > difficult. > There have been discussions in the past of implementing this feature client > side. > [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746] > This Jira is to track that discussion and implementation once the necessary > protocol support exists: KAFKA-2229 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side
[ https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089686#comment-15089686 ] Mayuresh Gharat commented on KAFKA-2410: I would like to bump up discussion on this. The way I understand this is that we have 2 points : 1) Have a config on the producer and consumer. a) TMR is disabled on the broker side. b) Producer and consumer have this config set to TRUE. c) The producer does the send(), it issues a TMR and finds that the topic does not exist. It will check this config and issue a createTopicRequest. When the createTopicRequest returns success, it will start producing the actual data. d) Same goes for Consumer, if the topic does not exist, consumer will create it if the config is enabled. This is not a hard requirement on the consumer side. 2) No config and the topic creation is left to user explicitly. The problem with 2) is that, if we have mirror makers running, we have to do the topic creation in producer, in some sort of callback may be, but it would be better to go with 1) in this scenario. + Joel [~jjkoshy] > Implement "Auto Topic Creation" client side and remove support from Broker > side > --- > > Key: KAFKA-2410 > URL: https://issues.apache.org/jira/browse/KAFKA-2410 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.8.2.1 >Reporter: Grant Henke >Assignee: Grant Henke > > Auto topic creation on the broker has caused pain in the past; And today it > still causes unusual error handling requirements on the client side, added > complexity in the broker, mixed responsibility of the TopicMetadataRequest, > and limits configuration of the option to be cluster wide. In the future > having it broker side will also make features such as authorization very > difficult. > There have been discussions in the past of implementing this feature client > side. > [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746] > This Jira is to track that discussion and implementation once the necessary > protocol support exists: KAFKA-2229 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089586#comment-15089586 ] Mayuresh Gharat commented on KAFKA-3083: [~fpj] can you shed some light on what you meant by misuse? [~junrao] if this is actually an issue, I would like to give a shot at it. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2887) TopicMetadataRequest creates topic if it does not exist
[ https://issues.apache.org/jira/browse/KAFKA-2887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15086470#comment-15086470 ] Mayuresh Gharat commented on KAFKA-2887: [~hachikuji] just wanted to know if we have ticket for which the above patch is available. > TopicMetadataRequest creates topic if it does not exist > --- > > Key: KAFKA-2887 > URL: https://issues.apache.org/jira/browse/KAFKA-2887 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.8.2.0 > Environment: Centos6, Java 1.7.0_75 >Reporter: Andrew Winterman >Priority: Minor > > We wired up a probe http endpoint to make TopicMetadataRequests with a > possible topic name. If no topic was found, we expected an empty response. > However if we asked for the same topic twice, it would exist the second time! > I think this is a bug because the purpose of the TopicMetadaRequest is to > provide information about the cluster, not mutate it. I can provide example > code if needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1911) Log deletion on stopping replicas should be async
[ https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-1911: --- Status: Patch Available (was: In Progress) > Log deletion on stopping replicas should be async > - > > Key: KAFKA-1911 > URL: https://issues.apache.org/jira/browse/KAFKA-1911 > Project: Kafka > Issue Type: Bug > Components: log, replication >Reporter: Joel Koshy >Assignee: Mayuresh Gharat > Labels: newbie++, newbiee > > If a StopReplicaRequest sets delete=true then we do a file.delete on the file > message sets. I was under the impression that this is fast but it does not > seem to be the case. > On a partition reassignment in our cluster the local time for stop replica > took nearly 30 seconds. > {noformat} > Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; > ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: > 53 from > client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0 > {noformat} > This ties up one API thread for the duration of the request. > Specifically in our case, the queue times for other requests also went up and > producers to the partition that was just deleted on the old leader took a > while to refresh their metadata (see KAFKA-1303) and eventually ran out of > retries on some messages leading to data loss. > I think the log deletion in this case should be fully asynchronous although > we need to handle the case when a broker may respond immediately to the > stop-replica-request but then go down after deleting only some of the log > segments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-1911) Log deletion on stopping replicas should be async
[ https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-1911 started by Mayuresh Gharat. -- > Log deletion on stopping replicas should be async > - > > Key: KAFKA-1911 > URL: https://issues.apache.org/jira/browse/KAFKA-1911 > Project: Kafka > Issue Type: Bug > Components: log, replication >Reporter: Joel Koshy >Assignee: Mayuresh Gharat > Labels: newbie++, newbiee > > If a StopReplicaRequest sets delete=true then we do a file.delete on the file > message sets. I was under the impression that this is fast but it does not > seem to be the case. > On a partition reassignment in our cluster the local time for stop replica > took nearly 30 seconds. > {noformat} > Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; > ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: > 53 from > client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0 > {noformat} > This ties up one API thread for the duration of the request. > Specifically in our case, the queue times for other requests also went up and > producers to the partition that was just deleted on the old leader took a > while to refresh their metadata (see KAFKA-1303) and eventually ran out of > retries on some messages leading to data loss. > I think the log deletion in this case should be fully asynchronous although > we need to handle the case when a broker may respond immediately to the > stop-replica-request but then go down after deleting only some of the log > segments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator
[ https://issues.apache.org/jira/browse/KAFKA-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-3013: --- Status: Patch Available (was: In Progress) > Display the topic-partition in the exception message for expired batches in > recordAccumulator > -- > > Key: KAFKA-3013 > URL: https://issues.apache.org/jira/browse/KAFKA-3013 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator
[ https://issues.apache.org/jira/browse/KAFKA-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3013 started by Mayuresh Gharat. -- > Display the topic-partition in the exception message for expired batches in > recordAccumulator > -- > > Key: KAFKA-3013 > URL: https://issues.apache.org/jira/browse/KAFKA-3013 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3062) Read from kafka replication to get data likely Version based
[ https://issues.apache.org/jira/browse/KAFKA-3062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081987#comment-15081987 ] Mayuresh Gharat commented on KAFKA-3062: Hi [~itismewxg], can you explain the usecase here with an example? Thanks, Mayuresh > Read from kafka replication to get data likely Version based > > > Key: KAFKA-3062 > URL: https://issues.apache.org/jira/browse/KAFKA-3062 > Project: Kafka > Issue Type: Improvement >Reporter: xingang > > Since Kafka require all the reading happens in the leader for the consistency. > If there could be possible for the reading can happens in replication, thus, > for data have a number of consumers, for the consumers Not latency-sensitive > But Data-Loss sensitive can fetch its data from replication, in this case, it > will pollute the Pagecache for other consumers which are latency-sensitive -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081910#comment-15081910 ] Mayuresh Gharat commented on KAFKA-2937: Yes, that might be possible but it should be rare. The AdminUtils.topicExists(topicName) call checks only if the the entry /brokers/topics/topicName exist in the zookeeper. The createTopic() should write to the path : /brokers/topic/topicName and also update the replicaAssignment for the topic in Zookeeper. If somehow the second part got messed up, the above error can occur. > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {quote} > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3040) Broker didn't report new data after change in leader
[ https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081519#comment-15081519 ] Mayuresh Gharat commented on KAFKA-3040: At Linkedin, we do have a separate controller log file on the broker that is the controller for the cluster. Can you see something like this "Broker HOST-NAME starting become controller state transition" on the broker that is the controller for the cluster? > Broker didn't report new data after change in leader > > > Key: KAFKA-3040 > URL: https://issues.apache.org/jira/browse/KAFKA-3040 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: Debian 3.2.54-2 x86_64 GNU/Linux >Reporter: Imran Patel >Priority: Critical > > Recently we had an event that causes large Kafka backlogs to develop > suddenty. This happened across multiple partitions. We noticed that after a > brief connection loss to Zookeeper, Kafka brokers were not reporting no new > data to our (SimpleConsumer) consumer although the producers were enqueueing > fine. This went on until another zk blip led to a reconfiguration which > suddenly caused the consumers to "see" the data. Our consumers and our > monitoring tools did not see the offsets move during the outage window. Here > is the sequence of events for a single partition (with logs attached below). > The brokers are running 0.9, the producer is using library version > kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java > programs). Our monitoring tool uses kafka-python-9.0 > Can you tell us if this could be due to a consumer bug (the libraries being > too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core > issue? Please note that we recently upgraded the brokers to 0.9 and hadn't > seen a similar issue prior to that. > - after a brief connection loss to zookeeper, the partition leader (broker 9 > for partition 29 in logs below) came back and shrunk the ISR to itself. > - producers kept on successfully sending data to Kafka and the remaining > replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. > Broker 9 did NOT replicate this data. It did repeatedly print the ISR > shrinking message over and over again. > - consumer on the other hand reported no new data presumably because it was > talking to 9 and that broker was doing nothing. > - 6 hours later, another zookeeper blip causes the brokers to reconfigure and > now consumers started seeing new data. > Broker 9: > [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding > ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition > [2015-12-18 00:59:25,511] INFO New leader is 9 > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking > ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition) > [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached > zkVersion [472] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset > 14169556269. (kafka.log.Log) > [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added > fetcher for partitions List([[messages,61], initOffset 14178575900 to broker > BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], > initOffset 14156091271 to broker > BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], > initOffset 14135826155 to broker > BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], > initOffset 14157926400 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], > initOffset 14169556269 to broker > BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], > initOffset 14175218230 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > Broker 3: > [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding > ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition) > [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > Broker 4: > [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added > fetcher
[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081352#comment-15081352 ] Mayuresh Gharat commented on KAFKA-2937: [~djh] does your integration test check if the topic is created successfully before attempting a delete? > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {quote} > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3040) Broker didn't report new data after change in leader
[ https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071192#comment-15071192 ] Mayuresh Gharat edited comment on KAFKA-3040 at 12/24/15 7:09 PM: -- Do you have the controller logs for the time period? Also you might want to check : https://issues.apache.org/jira/browse/KAFKA-3042 was (Author: mgharat): Do you have the controller logs for the time period? > Broker didn't report new data after change in leader > > > Key: KAFKA-3040 > URL: https://issues.apache.org/jira/browse/KAFKA-3040 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: Debian 3.2.54-2 x86_64 GNU/Linux >Reporter: Imran Patel >Priority: Critical > > Recently we had an event that causes large Kafka backlogs to develop > suddenty. This happened across multiple partitions. We noticed that after a > brief connection loss to Zookeeper, Kafka brokers were not reporting no new > data to our (SimpleConsumer) consumer although the producers were enqueueing > fine. This went on until another zk blip led to a reconfiguration which > suddenly caused the consumers to "see" the data. Our consumers and our > monitoring tools did not see the offsets move during the outage window. Here > is the sequence of events for a single partition (with logs attached below). > The brokers are running 0.9, the producer is using library version > kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java > programs). Our monitoring tool uses kafka-python-9.0 > Can you tell us if this could be due to a consumer bug (the libraries being > too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core > issue? Please note that we recently upgraded the brokers to 0.9 and hadn't > seen a similar issue prior to that. > - after a brief connection loss to zookeeper, the partition leader (broker 9 > for partition 29 in logs below) came back and shrunk the ISR to itself. > - producers kept on successfully sending data to Kafka and the remaining > replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. > Broker 9 did NOT replicate this data. It did repeatedly print the ISR > shrinking message over and over again. > - consumer on the other hand reported no new data presumably because it was > talking to 9 and that broker was doing nothing. > - 6 hours later, another zookeeper blip causes the brokers to reconfigure and > now consumers started seeing new data. > Broker 9: > [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding > ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition > [2015-12-18 00:59:25,511] INFO New leader is 9 > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking > ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition) > [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached > zkVersion [472] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset > 14169556269. (kafka.log.Log) > [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added > fetcher for partitions List([[messages,61], initOffset 14178575900 to broker > BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], > initOffset 14156091271 to broker > BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], > initOffset 14135826155 to broker > BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], > initOffset 14157926400 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], > initOffset 14169556269 to broker > BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], > initOffset 14175218230 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > Broker 3: > [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding > ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition) > [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > Broker 4: > [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added > fetche
[jira] [Commented] (KAFKA-3040) Broker didn't report new data after change in leader
[ https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071192#comment-15071192 ] Mayuresh Gharat commented on KAFKA-3040: Do you have the controller logs for the time period? > Broker didn't report new data after change in leader > > > Key: KAFKA-3040 > URL: https://issues.apache.org/jira/browse/KAFKA-3040 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: Debian 3.2.54-2 x86_64 GNU/Linux >Reporter: Imran Patel >Priority: Critical > > Recently we had an event that causes large Kafka backlogs to develop > suddenty. This happened across multiple partitions. We noticed that after a > brief connection loss to Zookeeper, Kafka brokers were not reporting no new > data to our (SimpleConsumer) consumer although the producers were enqueueing > fine. This went on until another zk blip led to a reconfiguration which > suddenly caused the consumers to "see" the data. Our consumers and our > monitoring tools did not see the offsets move during the outage window. Here > is the sequence of events for a single partition (with logs attached below). > The brokers are running 0.9, the producer is using library version > kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java > programs). Our monitoring tool uses kafka-python-9.0 > Can you tell us if this could be due to a consumer bug (the libraries being > too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core > issue? Please note that we recently upgraded the brokers to 0.9 and hadn't > seen a similar issue prior to that. > - after a brief connection loss to zookeeper, the partition leader (broker 9 > for partition 29 in logs below) came back and shrunk the ISR to itself. > - producers kept on successfully sending data to Kafka and the remaining > replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. > Broker 9 did NOT replicate this data. It did repeatedly print the ISR > shrinking message over and over again. > - consumer on the other hand reported no new data presumably because it was > talking to 9 and that broker was doing nothing. > - 6 hours later, another zookeeper blip causes the brokers to reconfigure and > now consumers started seeing new data. > Broker 9: > [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding > ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition > [2015-12-18 00:59:25,511] INFO New leader is 9 > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking > ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition) > [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached > zkVersion [472] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset > 14169556269. (kafka.log.Log) > [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added > fetcher for partitions List([[messages,61], initOffset 14178575900 to broker > BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], > initOffset 14156091271 to broker > BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], > initOffset 14135826155 to broker > BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], > initOffset 14157926400 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], > initOffset 14169556269 to broker > BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], > initOffset 14175218230 to broker > BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > Broker 3: > [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding > ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition) > [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > Broker 4: > [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed > fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager) > [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added > fetcher for partitions List([[messages,29], initOffset 14169556262 to broker > BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > [2015-12-18 07:09:50,191] ERROR [Repli
[jira] [Comment Edited] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15070294#comment-15070294 ] Mayuresh Gharat edited comment on KAFKA-2937 at 12/23/15 11:21 PM: --- [~rsivaram] This can occur if during delete topic, the controller writes the updated LeaderISR to zookeeper and dies and a new controller is elected and since the /admin/delete_topics/ path contains the topic, the deleteTopic is retried and when the controller tries to send the stopReplicaRequest with deletePartition = false when it tries to remove ReplicafromISR and since there is no LeaderISR, the above exception is thrown. I am thinking if while deleting a topic, is it necessary for the check : if (leaderAndIsrIsEmpty) throw new StateChangeFailedException( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) I am planning to check if the topic is being deleted and if YES, we do not throw the exception if the LeaderISR info in zookeeper is empty. On a side note did you see a log line "Retrying delete topic for topic since replicaswere not successfully deleted". was (Author: mgharat): [~rsivaram] This can occur if during delete topic, the controller writes the updated LeaderISR to zookeeper and dies and a new controller is elected and since the /admin/delete_topics/ path contains the topic, the deleteTopic is retried and when the controller tries to send the stopReplicaRequest with deletePartition = false when it tries to remove ReplicafromISR and since there is no LeaderISR, the above exception is thrown. I am thinking if while deleting a topic, is it necessary for the check : if (leaderAndIsrIsEmpty) throw new StateChangeFailedException( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) On a side note did you see a log line "Retrying delete topic for topic since replicaswere not successfully deleted". > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicD
[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15070294#comment-15070294 ] Mayuresh Gharat commented on KAFKA-2937: [~rsivaram] This can occur if during delete topic, the controller writes the updated LeaderISR to zookeeper and dies and a new controller is elected and since the /admin/delete_topics/ path contains the topic, the deleteTopic is retried and when the controller tries to send the stopReplicaRequest with deletePartition = false when it tries to remove ReplicafromISR and since there is no LeaderISR, the above exception is thrown. I am thinking if while deleting a topic, is it necessary for the check : if (leaderAndIsrIsEmpty) throw new StateChangeFailedException( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) On a side note did you see a log line "Retrying delete topic for topic since replicaswere not successfully deleted". > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.sc
[jira] [Created] (KAFKA-3013) Display the topic-partition in the exception message for expired batches in recordAccumulator
Mayuresh Gharat created KAFKA-3013: -- Summary: Display the topic-partition in the exception message for expired batches in recordAccumulator Key: KAFKA-3013 URL: https://issues.apache.org/jira/browse/KAFKA-3013 Project: Kafka Issue Type: Improvement Components: clients Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064365#comment-15064365 ] Mayuresh Gharat edited comment on KAFKA-2937 at 12/18/15 6:21 PM: -- I ran a test with few commits behind trunk HEAD : 1) Started the kafka cluster. 2) Produced data to topic T for quite some time so that enough data is accumulated on kafka brokers. 3) Delete a topic and immediately shutdown the cluster. 4) I could see the topic T under /admin/delete_topics. 5) After I restarted the cluster, the topic T was deleted. There might be a race condition here, were controller might have gone down after it wrote to zookeeper but under removed the topic under /admin/deleted_topics. I will have to investigate more on this. [~rsivaram]What is the version of kafka that you are running? was (Author: mgharat): I ran a test with few commits behind trunk HEAD : 1) Started the kafka cluster. 2) Produced data to topic T for quite some time so that enough data is accumulated on kafka brokers. 3) Delete a topic and immediately shutdown the cluster. 4) I could see the topic T under /admin/delete_topics. 5) After I restarted the cluster, the topic T was deleted. > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.con
[jira] [Commented] (KAFKA-2937) Topics marked for delete in Zookeeper may become undeletable
[ https://issues.apache.org/jira/browse/KAFKA-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064365#comment-15064365 ] Mayuresh Gharat commented on KAFKA-2937: I ran a test with few commits behind trunk HEAD : 1) Started the kafka cluster. 2) Produced data to topic T for quite some time so that enough data is accumulated on kafka brokers. 3) Delete a topic and immediately shutdown the cluster. 4) I could see the topic T under /admin/delete_topics. 5) After I restarted the cluster, the topic T was deleted. > Topics marked for delete in Zookeeper may become undeletable > > > Key: KAFKA-2937 > URL: https://issues.apache.org/jira/browse/KAFKA-2937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Mayuresh Gharat > > In our clusters, we occasionally see topics marked for delete, but never > actually deleted. It may be due to brokers being restarted while tests were > running, but further restarts of Kafka dont fix the problem. The topics > remain marked for delete in Zookeeper. > Topic describe shows: > {quote} > Topic:testtopic PartitionCount:1ReplicationFactor:3 Configs: > Topic: testtopicPartition: 0Leader: noneReplicas: 3,4,0 > Isr: > {quote} > Kafka logs show: > {quote} > 2015-12-02 15:53:30,152] ERROR Controller 2 epoch 213 initiated state change > of replica 3 for partition [testtopic,0] from OnlineReplica to OfflineReplica > failed (state.change.logger) > kafka.common.StateChangeFailedException: Failed to change state of replica 3 > for partition [testtopic,0] since the leader and isr path in zookeeper is > empty > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:269) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:342) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:111) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {quote} > -- This message was sent by Atlassian JIRA (v6.3.4#6332)