[jira] [Resolved] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12257.
-
Resolution: Fixed

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 2.8.2, 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of the

[jira] [Resolved] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13071.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10104.
-
Resolution: Duplicate

> Remove deprecated --zookeeper flags as specified in KIP-604
> ---
>
> Key: KAFKA-10104
> URL: https://issues.apache.org/jira/browse/KAFKA-10104
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> Remove deprecated --zookeeper flags as specified in KIP-604



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Fix Version/s: 2.8.2

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 3.0.0, 2.8.2
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of th

[jira] [Assigned] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13071:
---

Assignee: Jason Gustafson  (was: HaiyuanZhao)

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-15 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444032#comment-17444032
 ] 

Jason Gustafson commented on KAFKA-13071:
-

[~zhaohaidao] I'm going to pick this up so that we can get it into 3.1. Please 
let me know if you were planning to submit something soon.

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2021-11-13 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13421.
-
Resolution: Fixed

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-10 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442028#comment-17442028
 ] 

Jason Gustafson edited comment on KAFKA-13071 at 11/11/21, 1:30 AM:


I updated KIP-604 to include this: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools.
 I also sent a message to the vote thread to see if everyone is ok with the 
change.

[~zhaohaidao] Are you planning to pick this up?


was (Author: hachikuji):
I updated the KIP-604 to include this: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools.
 I also sent a message to the vote thread to see if everyone is ok with the 
change.

[~zhaohaidao] Are you planning to pick this up?

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-10 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442028#comment-17442028
 ] 

Jason Gustafson commented on KAFKA-13071:
-

I updated the KIP-604 to include this: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools.
 I also sent a message to the vote thread to see if everyone is ok with the 
change.

[~zhaohaidao] Are you planning to pick this up?

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-11-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12975.
-
Resolution: Duplicate

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-11-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reopened KAFKA-12975:
-

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12975) Consider how Topic IDs can improve consume experience

2021-11-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12975.
-
Resolution: Fixed

Closing in favor of https://issues.apache.org/jira/browse/KAFKA-13447

> Consider how Topic IDs can improve consume experience
> -
>
> Key: KAFKA-12975
> URL: https://issues.apache.org/jira/browse/KAFKA-12975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Currently when a consumer subscribes to a topic, it will continue to consume 
> from this topic across topic deletions and recreations with the same name. By 
> adding topic IDs to the consumer, we will have more insight for these events. 
> We should figure out if we want to change consumer logic now that we have 
> this information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13447) Consumer should not reuse committed offset after topic recreation

2021-11-10 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13447:
---

 Summary: Consumer should not reuse committed offset after topic 
recreation
 Key: KAFKA-13447
 URL: https://issues.apache.org/jira/browse/KAFKA-13447
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


KAFKA-12257 fixes an issue in which the consumer is unable to make progress 
after a topic has been recreated. The problem was that the client could not 
distinguish between stale metadata with a lower leader epoch and a recreated 
topic with a lower leader epoch. With TopicId support in KIP-516, the client is 
able to tell when a topic has been recreated since the new topic will have a 
different ID. 

However, what the patch did not fix is the potential reuse of the current 
offset position on the recreated topic. For example, say that the consumer is 
at offset N when the topic gets recreated. Currently, the consumer will 
continue fetching from offset N after detecting the recreation. The most likely 
result of this is either an offset out of range error or a log truncation 
error, but it is also possible for the offset position to remain valid on the 
recreated topic (say for a low-volume topic where the offsets is already low, 
or a case where the consumer was down for a while). 

To fix this issue completely, we need to store the topicId along with the 
committed offset in __consumer_offsets. This would allow the consumer to detect 
when the offset is no longer relevant for the current topic. We also need to 
decide how to raise this case to the user. If the user has enabled automatic 
offset reset, we can probably use that. Otherwise, we might need a new 
exception type to signal the user that the position needs to be reset.





--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2021-11-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13421:
---

Assignee: Jason Gustafson

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13417.
-
Resolution: Fixed

> Dynamic thread pool re-configurations may not get processed
> ---
>
> Key: KAFKA-13417
> URL: https://issues.apache.org/jira/browse/KAFKA-13417
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to 
> update the current configuration and to let each `Reconfigurable` process the 
> update:
> {code}
> val oldConfig = currentConfig
> val (newConfig, brokerReconfigurablesToUpdate) = 
> processReconfiguration(newProps, validateOnly = false)
> if (newConfig ne currentConfig) {
>   currentConfig = newConfig
>   kafkaConfig.updateCurrentConfig(newConfig)
>   // Process BrokerReconfigurable updates after current config is updated
>   brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, 
> newConfig))
> }
> {code}
> The problem here is that `currentConfig` gets initialized as `kafkaConfig` 
> which means that the first call to 
> `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` 
> and consequently `oldConfig`. The problem with this is that some of the 
> `reconfigure` implementations will only apply a new configuration if the 
> value in `oldConfig` does not match the value in `newConfig`. For example, 
> here is the logic to update thread pools dynamically:
> {code}
>   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
> Unit = {
> if (newConfig.numIoThreads != oldConfig.numIoThreads)
>   
> server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
> if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
>   server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
> newConfig.numNetworkThreads)
> if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
>   
> server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
> if (newConfig.numRecoveryThreadsPerDataDir != 
> oldConfig.numRecoveryThreadsPerDataDir)
>   
> server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
> if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
>   server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
>   }
> {code}
> Because of this, the dynamic update will not get applied the first time it is 
> made. I believe subsequent updates would work correctly though because we 
> would have lost the indirect reference to `kafkaConfig`. Other than the 
> `DynamicThreadPool` configurations, it looks like the config to update 
> unclean leader election may also be affected by this bug.
> NOTE: This bug only affects kraft, which is missing the call to 
> `DynamicBrokerConfig.initialize()`. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13417:

Description: 
`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to 
update the current configuration and to let each `Reconfigurable` process the 
update:
{code}
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = 
processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
  currentConfig = newConfig
  kafkaConfig.updateCurrentConfig(newConfig)

  // Process BrokerReconfigurable updates after current config is updated
  brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` 
which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` 
ends up mutating `currentConfig` and consequently `oldConfig`. The problem with 
this is that some of the `reconfigure` implementations will only apply a new 
configuration if the value in `oldConfig` does not match the value in 
`newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
  
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
  server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
  
server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
if (newConfig.numRecoveryThreadsPerDataDir != 
oldConfig.numRecoveryThreadsPerDataDir)
  
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
  server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is 
made. I believe subsequent updates would work correctly though because we would 
have lost the indirect reference to `kafkaConfig`. Other than the 
`DynamicThreadPool` configurations, it looks like the config to update unclean 
leader election may also be affected by this bug.

NOTE: This bug only affects kraft, which is missing the call to 
`DynamicBrokerConfig.initialize()`. 

  was:
`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to 
update the current configuration and to let each `Reconfigurable` process the 
update:
{code}
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = 
processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
  currentConfig = newConfig
  kafkaConfig.updateCurrentConfig(newConfig)

  // Process BrokerReconfigurable updates after current config is updated
  brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` 
which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` 
ends up mutating `currentConfig` and consequently `oldConfig`. The problem with 
this is that some of the `reconfigure` implementations will only apply a new 
configuration if the value in `oldConfig` does not match the value in 
`newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
  
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
  server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
  
server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
if (newConfig.numRecoveryThreadsPerDataDir != 
oldConfig.numRecoveryThreadsPerDataDir)
  
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
  server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is 
made. I believe subsequent updates would work correctly though because we would 
have lost the indirect reference to `kafkaConfig`. Other than the 
`DynamicThreadPool` configurations, it looks like the config to update unclean 
leader election may also be affected by th

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Fix Version/s: 3.1.0

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of the topic

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Affects Version/s: 2.3.1
   2.2.2

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Affects Version/s: 2.6.1
   2.5.1
   2.4.1

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader e

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Affects Version/s: 2.8.1
   2.7.1
   (was: 2.2.0)
   (was: 3.1.0)

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> af

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Fix Version/s: 3.0.0

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of the topic id, 
> rather than the name.
> One

[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Fix Version/s: (was: 3.1.0)

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of the topic id, 
> rather than the name.
>

[jira] [Created] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed

2021-10-28 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13417:
---

 Summary: Dynamic thread pool re-configurations may not get 
processed
 Key: KAFKA-13417
 URL: https://issues.apache.org/jira/browse/KAFKA-13417
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to 
update the current configuration and to let each `Reconfigurable` process the 
update:
{code}
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = 
processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
  currentConfig = newConfig
  kafkaConfig.updateCurrentConfig(newConfig)

  // Process BrokerReconfigurable updates after current config is updated
  brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` 
which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` 
ends up mutating `currentConfig` and consequently `oldConfig`. The problem with 
this is that some of the `reconfigure` implementations will only apply a new 
configuration if the value in `oldConfig` does not match the value in 
`newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
  
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
  server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
  
server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
if (newConfig.numRecoveryThreadsPerDataDir != 
oldConfig.numRecoveryThreadsPerDataDir)
  
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
  server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is 
made. I believe subsequent updates would work correctly though because we would 
have lost the indirect reference to `kafkaConfig`. Other than the 
`DynamicThreadPool` configurations, it looks like the config to update unclean 
leader election may also be affected by this bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13412) Retry of initTransactions after timeout may cause invalid transition

2021-10-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13412:
---

 Summary: Retry of initTransactions after timeout may cause invalid 
transition
 Key: KAFKA-13412
 URL: https://issues.apache.org/jira/browse/KAFKA-13412
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


If `initTransactions()` cannot be completed before the timeout defined by 
`max.block.ms`, then the call will raise a `TimeoutException`. The user is 
expected to retry this, which is what Kafka Streams does. However, the producer 
will keep retrying the `InitProducerId` request in the background and it is 
possible for it to return before the retry call to `initTransaction()`. This 
leads to the following exception:
{code}
org.apache.kafka.common.KafkaException: TransactionalId blah: Invalid 
transition attempted from state READY to state INITIALIZING

at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1077)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1070)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.lambda$initializeTransactions$1(TransactionManager.java:336)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1198)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:333)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:328)
at 
org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:597)
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-10-26 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10800.
-
Resolution: Fixed

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset of the snapshot is less than or equal to the high-watermark.
>  # The epoch of the snapshot is less than or equal to the quorum epoch.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty

2021-09-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13319:
---

 Summary: Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets 
map is empty
 Key: KAFKA-13319
 URL: https://issues.apache.org/jira/browse/KAFKA-13319
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


If a user calls `Producer.sendOffsetsToTransaction` with an empty map of 
offsets, we can shortcut return and skip the logic to add the offsets topic to 
the transaction. The main benefit is avoiding the unnecessary accumulation of 
markers in __consumer_offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13254) Deadlock when expanding ISR

2021-09-20 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13254.
-
Resolution: Fixed

> Deadlock when expanding ISR
> ---
>
> Key: KAFKA-13254
> URL: https://issues.apache.org/jira/browse/KAFKA-13254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Found this when debugging downgrade system test failures. The patch for 
> https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
> are the jstack details:
> {code}
> "data-plane-kafka-request-handler-4": 
>   
>
>   waiting for ownable synchronizer 0xfcc00020, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>
>   which is held by "data-plane-kafka-request-handler-5"   
>   
>
> "data-plane-kafka-request-handler-5":
>   waiting for ownable synchronizer 0xc9161b20, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xfcc00020> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at 
> kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
> at 
> kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
> at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
> at 
> kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
> at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown 
> Source)
> at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
> at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
> at 
> kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
> at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
> at 
> kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
> at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
> at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
> at 
> kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
> at 
> kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
> at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-5":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xc9161b20> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSyn

[jira] [Commented] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr

2021-09-16 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416349#comment-17416349
 ] 

Jason Gustafson commented on KAFKA-13227:
-

[~christo_lolov] When we have updated the ISR state after receiving a 
LeaderAndIsr request, we have an opportunity to cancel any inflight AlterIsr 
requests since they would be doomed to fail. When we do this, we want make sure 
that the callback could not be mistakenly applied. We protect against this 
currently by checking in the callback whether the ISR state matches the pending 
state we expected when the AlterIsr request was enqueued. However, the check is 
a little weak since it does not include the update version, so it is 
conceivable that we could enqueue up the same change a second time. This is 
probably not the only way to do it. Maybe we just need a flag in the callback 
to indicate that the request was cancelled. 

> Cancel pending AlterIsr requests after receiving LeaderAndIsr
> -
>
> Key: KAFKA-13227
> URL: https://issues.apache.org/jira/browse/KAFKA-13227
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Christo Lolov
>Priority: Major
>
> Currently we do not cancel pending AlterIsr requests after the state has been 
> updated through a LeaderAndIsr request received from the controller. This 
> leads to log messages such as this 
> {code}
> [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] 
> Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, 
> isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition 
> __transaction_state-32 (kafka.cluster.Partition)
> {code}
> I think the only complication here is protecting against the AlterIsr 
> callback which is executed asynchronously. To address this, we can move the 
> `zkVersion` field into `IsrState`. When the callback is invoked, we can the 
> existing state against the response state to decide whether to apply the 
> change. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-09-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13162.
-
Resolution: Fixed

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.1.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-09-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13162:

Fix Version/s: (was: 3.0.1)
   3.1.0

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.1.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election

2021-09-14 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415051#comment-17415051
 ] 

Jason Gustafson commented on KAFKA-7408:


[~jagsancio] The basic idea was to truncate to the LSO and then rewrite markers 
for any transactions which were in progress. For example, suppose we have this 
in the log:

x1 x2 y1 y2 z1 z2 xC yA

So the transaction beginning at z1 is where the LSO would be. The idea is then 
to truncate to that point and rewrite the markers that came after:

x1 x2 y1 y2 xC yA

But I realize now that this approach was probably unnecessarily aggressive. I 
wasn't taking into account transactions which started and ended after the LSO. 
For example:

x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC

Now we have another x transaction which begins after the LSO. We'd lose this if 
we have to truncate. So I'm not thinking your 2) suggestion is probably a 
better approach. Rather than truncating, we can abort. This is simpler to 
implement since it does not involve any rewriting of markers. The example above 
becomes this:

x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC zA

I had previously thought that it was a stronger guarantee if we could say that 
no transaction outcomes were changed. But I realize now that there is probably 
not a practical difference between dropping the data from a transaction and 
just aborting it.

> Truncate to LSO on unclean leader election
> --
>
> Key: KAFKA-7408
> URL: https://issues.apache.org/jira/browse/KAFKA-7408
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> If an unclean leader is elected, we may lose committed transaction data. That 
> alone is expected, but what is worse is that a transaction which was 
> previously completed (either committed or aborted) may lose its marker and 
> become dangling. The transaction coordinator will not know about the unclean 
> leader election, so will not know to resend the transaction markers. 
> Consumers with read_committed isolation will be stuck because the LSO cannot 
> advance.
> To keep this scenario from occurring, it would be better to have the unclean 
> leader truncate to the LSO so that there are no dangling transactions. 
> Truncating to the LSO is not alone sufficient because the markers which 
> allowed the LSO advancement may be at higher offsets. What we can do is let 
> the newly elected leader truncate to the LSO and then rewrite all the markers 
> that followed it using its own leader epoch (to avoid divergence from 
> followers).
> The interesting cases when an unclean leader election occurs are are when a 
> transaction is ongoing. 
> 1. If a producer is in the middle of a transaction commit, then the 
> coordinator may still attempt to write transaction markers. This will either 
> succeed or fail depending on the producer epoch in the unclean leader. If the 
> epoch matches, then the WriteTxnMarker call will succeed, which will simply 
> be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker 
> call will fail and the transaction coordinator can potentially remove the 
> partition from the transaction.
> 2. If a producer is still writing the transaction, then what happens depends 
> on the producer state in the unclean leader. If no producer state has been 
> lost, then the transaction can continue without impact. Otherwise, the 
> producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will 
> cause the transaction to be aborted by the coordinator. That takes us back to 
> the first case.
> By truncating the LSO, we ensure that transactions are either preserved in 
> whole or they are removed from the log in whole. For an unclean leader 
> election, that's probably as good as we can do. But we are ensured that 
> consumers will not be blocked by dangling transactions. The only remaining 
> situation where a dangling transaction might be left is if one of the 
> transaction state partitions has an unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13149) Null Pointer Exception for record==null when handling a produce request

2021-09-14 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13149.
-
Fix Version/s: 3.0.1
   Resolution: Fixed

> Null Pointer Exception for record==null when handling a produce request
> ---
>
> Key: KAFKA-13149
> URL: https://issues.apache.org/jira/browse/KAFKA-13149
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Cong Ding
>Priority: Major
> Fix For: 3.0.1
>
>
> In production, we have seen an exception
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.common.record.Record.hasMagic(byte)" because "record" is 
> null{code}
> which is triggered by
>  
> [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191]
>  when handling a produce request.
> The reason is that 
> [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L294-L296]
>  returns record==null, which is possibly caused by a bad client. However, we 
> have no idea about the client in our multi-tenant environment.
> We should let the broker throw an invalid record exception and notify clients.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics

2021-09-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13288.
-
Resolution: Fixed

> Transaction find-hanging command with --broker-id excludes internal topics
> --
>
> Key: KAFKA-13288
> URL: https://issues.apache.org/jira/browse/KAFKA-13288
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is 
> specified. By default, this excludes internal topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics

2021-09-10 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413278#comment-17413278
 ] 

Jason Gustafson commented on KAFKA-13288:
-

Note that it is still possible to use it with the --topic option instead. For 
example: 
{code}
kafka-transaction.sh --bootstrap-server localhost:9092 find-hanging --topic 
__consumer_offsets
{code}

> Transaction find-hanging command with --broker-id excludes internal topics
> --
>
> Key: KAFKA-13288
> URL: https://issues.apache.org/jira/browse/KAFKA-13288
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is 
> specified. By default, this excludes internal topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics

2021-09-09 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13288:
---

 Summary: Transaction find-hanging command with --broker-id 
excludes internal topics
 Key: KAFKA-13288
 URL: https://issues.apache.org/jira/browse/KAFKA-13288
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is 
specified. By default, this excludes internal topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13234) Transaction system tests should check URPs between broker bounces

2021-09-01 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13234.
-
Resolution: Fixed

> Transaction system tests should check URPs between broker bounces
> -
>
> Key: KAFKA-13234
> URL: https://issues.apache.org/jira/browse/KAFKA-13234
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We've seen a number of failing transaction system tests. After investigating 
> a few of these, I found that this came down to transaction timeouts which 
> were due to partition unavailability. The test rolls the brokers in the 
> cluster, but it does not verify that URPs have cleared after restarting each 
> broker. This means that it is possible for partitions to go under min isr or 
> even offline. In the case of a hard bounce, the time to recover may be quite 
> long because of the default session timeout of 18s. It would be good wait for 
> URPs to clear instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr

2021-08-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13091:

Fix Version/s: (was: 3.0.1)

> Increment HW after shrinking ISR through AlterIsr
> -
>
> Key: KAFKA-13091
> URL: https://issues.apache.org/jira/browse/KAFKA-13091
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> After we have shrunk the ISR, we have an opportunity to advance the high 
> watermark. We do this currently in `maybeShrinkIsr` after the synchronous 
> update through ZK. For the AlterIsr path, however, we cannot rely on this 
> call since the request is sent asynchronously. Instead we should attempt to 
> advance the high watermark in the callback when the AlterIsr response returns 
> successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13254) Deadlock when expanding ISR

2021-08-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13254:

Issue Type: Bug  (was: Improvement)

> Deadlock when expanding ISR
> ---
>
> Key: KAFKA-13254
> URL: https://issues.apache.org/jira/browse/KAFKA-13254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Found this when debugging downgrade system test failures. The patch for 
> https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
> are the jstack details:
> {code}
> "data-plane-kafka-request-handler-4": 
>   
>
>   waiting for ownable synchronizer 0xfcc00020, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>
>   which is held by "data-plane-kafka-request-handler-5"   
>   
>
> "data-plane-kafka-request-handler-5":
>   waiting for ownable synchronizer 0xc9161b20, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xfcc00020> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at 
> kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
> at 
> kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
> at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
> at 
> kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
> at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown 
> Source)
> at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
> at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
> at 
> kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
> at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
> at 
> kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
> at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
> at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
> at 
> kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
> at 
> kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
> at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-5":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xc9161b20> (a 
> java.util.concurrent.locks.ReentrantReadWrit

[jira] [Created] (KAFKA-13254) Deadlock when expanding ISR

2021-08-30 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13254:
---

 Summary: Deadlock when expanding ISR
 Key: KAFKA-13254
 URL: https://issues.apache.org/jira/browse/KAFKA-13254
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Found this when debugging downgrade system test failures. The patch for 
https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
are the jstack details:
{code}
"data-plane-kafka-request-handler-4":   

   
  waiting for ownable synchronizer 0xfcc00020, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),  
 
  which is held by "data-plane-kafka-request-handler-5" 

   

"data-plane-kafka-request-handler-5":
  waiting for ownable synchronizer 0xc9161b20, (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "data-plane-kafka-request-handler-4"

"data-plane-kafka-request-handler-4":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xfcc00020> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at 
kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
at 
kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source)
at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
at 
kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
at 
kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
Source)
at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
at 
kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
at java.lang.Thread.run(Thread.java:748)

"data-plane-kafka-request-handler-5":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc9161b20> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at 
java.util.concurren

[jira] [Commented] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr

2021-08-26 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405420#comment-17405420
 ] 

Jason Gustafson commented on KAFKA-13227:
-

[~christo_lolov] Thanks, feel free to pick it up. I would suggest looking at 
the test cases in `PartitionTest`. For example, `testRetryShrinkIsr` looks like 
it already has most of the machinery you will need.

> Cancel pending AlterIsr requests after receiving LeaderAndIsr
> -
>
> Key: KAFKA-13227
> URL: https://issues.apache.org/jira/browse/KAFKA-13227
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> Currently we do not cancel pending AlterIsr requests after the state has been 
> updated through a LeaderAndIsr request received from the controller. This 
> leads to log messages such as this 
> {code}
> [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] 
> Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, 
> isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition 
> __transaction_state-32 (kafka.cluster.Partition)
> {code}
> I think the only complication here is protecting against the AlterIsr 
> callback which is executed asynchronously. To address this, we can move the 
> `zkVersion` field into `IsrState`. When the callback is invoked, we can the 
> existing state against the response state to decide whether to apply the 
> change. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13235) Add support for static groups to transaction system tests

2021-08-26 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13235:
---

 Summary: Add support for static groups to transaction system tests
 Key: KAFKA-13235
 URL: https://issues.apache.org/jira/browse/KAFKA-13235
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Currently `TransactionalMessageCopier` does not support static groups 
configured using `group.instance.id`. We should consider adding support and 
parameterizing the corresponding system tests which rely on it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13234) Transaction system tests should check URPs between broker bounces

2021-08-26 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13234:
---

 Summary: Transaction system tests should check URPs between broker 
bounces
 Key: KAFKA-13234
 URL: https://issues.apache.org/jira/browse/KAFKA-13234
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We've seen a number of failing transaction system tests. After investigating a 
few of these, I found that this came down to transaction timeouts which were 
due to partition unavailability. The test rolls the brokers in the cluster, but 
it does not verify that URPs have cleared after restarting each broker. This 
means that it is possible for partitions to go under min isr or even offline. 
In the case of a hard bounce, the time to recover may be quite long because of 
the default session timeout of 18s. It would be good wait for URPs to clear 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr

2021-08-25 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13227:
---

 Summary: Cancel pending AlterIsr requests after receiving 
LeaderAndIsr
 Key: KAFKA-13227
 URL: https://issues.apache.org/jira/browse/KAFKA-13227
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Currently we do not cancel pending AlterIsr requests after the state has been 
updated through a LeaderAndIsr request received from the controller. This leads 
to log messages such as this 
{code}
[2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] 
Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, 
isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition 
__transaction_state-32 (kafka.cluster.Partition)
{code}
I think the only complication here is protecting against the AlterIsr callback 
which is executed asynchronously. To address this, we can move the `zkVersion` 
field into `IsrState`. When the callback is invoked, we can the existing state 
against the response state to decide whether to apply the change. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13214) Consumer should not reset group state after disconnect

2021-08-24 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13214.
-
Fix Version/s: 2.8.1
   2.7.2
   3.0.0
   Resolution: Fixed

> Consumer should not reset group state after disconnect
> --
>
> Key: KAFKA-13214
> URL: https://issues.apache.org/jira/browse/KAFKA-13214
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> When the consumer disconnects from the coordinator while a rebalance is in 
> progress, we currently reset the memberId and generation. The coordinator 
> then must await the session timeout in order to expire the old memberId. This 
> was apparently a regression from 
> https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478.
>  It would be better to keep the memberId/generation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr

2021-08-23 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13091.
-
Fix Version/s: 3.0.1
   Resolution: Fixed

> Increment HW after shrinking ISR through AlterIsr
> -
>
> Key: KAFKA-13091
> URL: https://issues.apache.org/jira/browse/KAFKA-13091
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.1
>
>
> After we have shrunk the ISR, we have an opportunity to advance the high 
> watermark. We do this currently in `maybeShrinkIsr` after the synchronous 
> update through ZK. For the AlterIsr path, however, we cannot rely on this 
> call since the request is sent asynchronously. Instead we should attempt to 
> advance the high watermark in the callback when the AlterIsr response returns 
> successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9938) Fix debug consumer read from follower for older protocol versions

2021-08-20 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9938.

Resolution: Won't Fix

I am going to close this issue as "won't fix." Since the new `Fetch` protocol 
versions allows fetching from followers already, there seems little reason to 
add this logic just for older versions. The `ReplicaVerificationTool` will 
still work for example. Effectively, the behavior of the debug consumer is the 
same as the normal consumer.

> Fix debug consumer read from follower for older protocol versions
> -
>
> Key: KAFKA-9938
> URL: https://issues.apache.org/jira/browse/KAFKA-9938
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In fetch requests, a sentinel of -2 for the replicaId is treated as a "debug 
> consumer" and is allowed to fetch from followers. In KIP-392, we added the 
> general ability to read from followers, but we require a newer version of the 
> protocol. In the process of this change, we lost the ability for older 
> version of the fetch protocol to use the "debug consumer" to read from 
> followers. As far as I know the only place this capability is used is in the 
> ReplicaVerificationTool. We don't expose this capability from the consumer. 
> However, it is still technically a regression and should be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9886) Validate segment range before reading in `Log.read`

2021-08-20 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-9886:
--

Assignee: (was: Jason Gustafson)

> Validate segment range before reading in `Log.read`
> ---
>
> Key: KAFKA-9886
> URL: https://issues.apache.org/jira/browse/KAFKA-9886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> Log.read uses the following logic to set the upper limit on a segment read.
> {code}
> val maxPosition = {
>// Use the max offset position if it is on this segment; otherwise, the 
> segment size is the limit.
>   if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
>  maxOffsetMetadata.relativePositionInSegment
>   } else {
> segment.size
>   }
> }
> {code}
> In the else branch, the expectation is that 
> `maxOffsetMetadata.segmentBaseOffset > segment.baseOffset`. In KAFKA-9838, we 
> found a bug where this assumption failed  which led to reads above the high 
> watermark. We should validate the expectation explicitly so that we don't 
> leave the door open for similar bugs in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13221) Add metric for `PartitionsWithLateTransactionsCount`

2021-08-20 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13221:
---

 Summary: Add metric for `PartitionsWithLateTransactionsCount`
 Key: KAFKA-13221
 URL: https://issues.apache.org/jira/browse/KAFKA-13221
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


The metric `PartitionsWithLateTransactionsCount` was introduced in KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-Metrics.
 This metric will record the number of partitions which have open transactions 
with durations exceeding `transaction.max.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13214) Consumer should not reset group state after disconnect

2021-08-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13214:
---

Assignee: Jason Gustafson

> Consumer should not reset group state after disconnect
> --
>
> Key: KAFKA-13214
> URL: https://issues.apache.org/jira/browse/KAFKA-13214
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
>
> When the consumer disconnects from the coordinator while a rebalance is in 
> progress, we currently reset the memberId and generation. The coordinator 
> then must await the session timeout in order to expire the old memberId. This 
> was apparently a regression from 
> https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478.
>  It would be better to keep the memberId/generation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13214) Consumer should not reset group state after disconnect

2021-08-18 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13214:
---

 Summary: Consumer should not reset group state after disconnect
 Key: KAFKA-13214
 URL: https://issues.apache.org/jira/browse/KAFKA-13214
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0, 2.7.0
Reporter: Jason Gustafson


When the consumer disconnects from the coordinator while a rebalance is in 
progress, we currently reset the memberId and generation. The coordinator then 
must await the session timeout in order to expire the old memberId. This was 
apparently a regression from 
https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478.
 It would be better to keep the memberId/generation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-12 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398209#comment-17398209
 ] 

Jason Gustafson commented on KAFKA-13162:
-

This has turned out to be more work than expected. I am going to change the 
target version to 3.0.1. cc [~kkonstantine]

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13162:

Fix Version/s: (was: 3.0.0)
   3.0.1

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.1
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft

2021-08-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13161.
-
Resolution: Fixed

> Follower leader and ISR state not updated after partition change in KRaft
> -
>
> Key: KAFKA-13161
> URL: https://issues.apache.org/jira/browse/KAFKA-13161
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In KRaft when we detect a partition change, we first verify whether any 
> leader or follower transitions are needed. Depending on the case, we call 
> either `applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter 
> case, we are missing a call to `Partition.makeFollower` which is responsible 
> for updating LeaderAndIsr state for the partitions. As a result of this, the 
> partition state may be left stale. 
> The specific consequences of this bug are 1) follower fetching fails since 
> the epoch is never updated, and 2) a stale leader may continue to accept 
> Produce requests. The latter is the bigger issue since it can lead to log 
> divergence if we are appending from both the client and from the fetcher 
> thread at the same time. I tested this locally and confirmed that it is 
> possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier

2021-08-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13155.
-
Resolution: Fixed

> ConcurrentModificationException in TransactionalMessageCopier
> -
>
> Key: KAFKA-13155
> URL: https://issues.apache.org/jira/browse/KAFKA-13155
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Caught this exception in a system test run:
> {code}
> [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
> 'transactional-message-copier-shutdown-hook': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
> at 
> org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The pattern for closing the consumer is not safe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13132.
-
Resolution: Fixed

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394319#comment-17394319
 ] 

Jason Gustafson commented on KAFKA-13173:
-

I upgraded this bug to a blocker because I think it can result in data loss. 
For example, in the example above, the second ISR change would be interpreted 
as an expansion, but there may have been committed writes to the log between 
the two ISR changes which were not reflected in the expansion.

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Description: 
In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 

  was:
In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 


> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Fix Version/s: 3.0.0

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13173:

Priority: Blocker  (was: Major)

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13173:
---

 Summary: KRaft controller does not handle simultaneous broker 
expirations correctly
 Key: KAFKA-13173
 URL: https://issues.apache.org/jira/browse/KAFKA-13173
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
stale replicas and attempt to remove them from the ISR. However, when multiple 
expirations occur at once, we do not properly accumulate the ISR changes. For 
example, I ran a test where the ISR of a partition was initialized to [1, 2, 
3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The 
records that were generated by `fenceStaleBrokers` were the following:

{code}
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
removingReplicas=null, addingReplicas=null) at version 0), 
ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
{code}

First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record 
to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 
3 is handled. So we did not account for the fact that we had already fenced 
broker 2 in the request. 

A simple solution for now is to change the logic to handle fencing only one 
broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown

2021-08-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13167.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> KRaft broker should heartbeat immediately during controlled shutdown
> 
>
> Key: KAFKA-13167
> URL: https://issues.apache.org/jira/browse/KAFKA-13167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> Controlled shutdown in KRaft is signaled through a heartbeat request with the 
> `shouldShutDown` flag set to true. When we begin controlled shutdown, we 
> should immediately schedule the next heartbeat instead of waiting for the 
> next periodic heartbeat so that we can shutdown more quickly. Otherwise 
> controlled shutdown can be delayed by several seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown

2021-08-04 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13167:
---

 Summary: KRaft broker should heartbeat immediately during 
controlled shutdown
 Key: KAFKA-13167
 URL: https://issues.apache.org/jira/browse/KAFKA-13167
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Controlled shutdown in KRaft is signaled through a heartbeat request with the 
`shouldShutDown` flag set to true. When we begin controlled shutdown, we should 
immediately schedule the next heartbeat instead of waiting for the next 
periodic heartbeat so that we can shutdown more quickly. Otherwise controlled 
shutdown can be delayed by several seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13162:
---

 Summary: ElectLeader API must be forwarded to Controller
 Key: KAFKA-13162
 URL: https://issues.apache.org/jira/browse/KAFKA-13162
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0


We're missing the logic to forward ElectLeaders requests to the controller. 
This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13161:
---

 Summary: Follower leader and ISR state not updated after partition 
change in KRaft
 Key: KAFKA-13161
 URL: https://issues.apache.org/jira/browse/KAFKA-13161
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


In KRaft when we detect a partition change, we first verify whether any leader 
or follower transitions are needed. Depending on the case, we call either 
`applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter case, we 
are missing a call to `Partition.makeFollower` which is responsible for 
updating LeaderAndIsr state for the partitions. As a result of this, the 
partition state may be left stale. 

The specific consequences of this bug are 1) follower fetching fails since the 
epoch is never updated, and 2) a stale leader may continue to accept Produce 
requests. The latter is the bigger issue since it can lead to log divergence if 
we are appending from both the client and from the fetcher thread at the same 
time. I tested this locally and confirmed that it is possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds

2021-08-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13099:

Fix Version/s: 2.6.3
   2.5.2

> Message too large error when expiring transactionalIds
> --
>
> Key: KAFKA-13099
> URL: https://issues.apache.org/jira/browse/KAFKA-13099
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, 
> 2.7.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0, 2.5.2, 2.6.3, 2.7.2, 2.8.1
>
>
> We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing 
> tombstones for expired transactionalIds. This is possible because we collect 
> all expired IDs into a single batch. We should ensure that the created 
> batches are smaller than the max message size. Any expired IDs that cannot 
> fit can be expired later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-08-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13142:

Affects Version/s: (was: 3.0.0)

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13148) Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE

2021-08-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13148.
-
Resolution: Fixed

Closing this since it was fixed by KAFKA-12158.

> Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE
> ---
>
> Key: KAFKA-13148
> URL: https://issues.apache.org/jira/browse/KAFKA-13148
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Niket Goel
>Priority: Major
>  Labels: kip-500
>
> In some cases the RaftClient will return Long.MAX_VALUE:
> {code:java}
>   /**
>* Append a list of records to the log. The write will be scheduled for 
> some time
>* in the future. There is no guarantee that appended records will be 
> written to
>* the log and eventually committed. However, it is guaranteed that if 
> any of the
>* records become committed, then all of them will be.
>*
>* If the provided current leader epoch does not match the current 
> epoch, which
>* is possible when the state machine has yet to observe the epoch 
> change, then
>* this method will return {@link Long#MAX_VALUE} to indicate an offset 
> which is
>* not possible to become committed. The state machine is expected to 
> discard all
>* uncommitted entries after observing an epoch change.
>*
>* @param epoch the current leader epoch
>* @param records the list of records to append
>* @return the expected offset of the last record; {@link 
> Long#MAX_VALUE} if the records could
>* be committed; null if no memory could be allocated for the 
> batch at this time
>* @throws org.apache.kafka.common.errors.RecordBatchTooLargeException 
> if the size of the records is greater than the maximum
>* batch size; if this exception is throw none of the elements 
> in records were
>* committed
>*/
>   Long scheduleAtomicAppend(int epoch, List records);
>  {code}
> The controller doesn't handle this case:
> {code:java}
>   // If the operation returned a batch of records, those 
> records need to be
>   // written before we can return our result to the user.  
> Here, we hand off
>   // the batch of records to the raft client.  They will be 
> written out
>   // asynchronously.
>   final long offset;
>   if (result.isAtomic()) {
>   offset = 
> raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
>   } else {
>   offset = raftClient.scheduleAppend(controllerEpoch, 
> result.records());
>   }
>   op.processBatchEndOffset(offset);
>   writeOffset = offset;
>   resultAndOffset = ControllerResultAndOffset.of(offset, 
> result);
>   for (ApiMessageAndVersion message : result.records()) {
>   replay(message.message(), Optional.empty(), offset);
>   }
>   snapshotRegistry.getOrCreateSnapshot(offset);
>   log.debug("Read-write operation {} will be completed when 
> the log " +
>   "reaches offset {}.", this, resultAndOffset.offset());
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier

2021-08-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13155:
---

Assignee: Jason Gustafson

> ConcurrentModificationException in TransactionalMessageCopier
> -
>
> Key: KAFKA-13155
> URL: https://issues.apache.org/jira/browse/KAFKA-13155
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Caught this exception in a system test run:
> {code}
> [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
> 'transactional-message-copier-shutdown-hook': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
> at 
> org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The pattern for closing the consumer is not safe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier

2021-08-02 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13155:
---

 Summary: ConcurrentModificationException in 
TransactionalMessageCopier
 Key: KAFKA-13155
 URL: https://issues.apache.org/jira/browse/KAFKA-13155
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


Caught this exception in a system test run:
```
[2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
'transactional-message-copier-shutdown-hook': 
(org.apache.kafka.common.utils.KafkaThread)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
at 
org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
at java.lang.Thread.run(Thread.java:748)
```
The pattern for closing the consumer is not safe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier

2021-08-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13155:

Description: 
Caught this exception in a system test run:
{code}
[2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
'transactional-message-copier-shutdown-hook': 
(org.apache.kafka.common.utils.KafkaThread)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
at 
org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
at java.lang.Thread.run(Thread.java:748)
{code}
The pattern for closing the consumer is not safe.

  was:
Caught this exception in a system test run:
```
[2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
'transactional-message-copier-shutdown-hook': 
(org.apache.kafka.common.utils.KafkaThread)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
at 
org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
at java.lang.Thread.run(Thread.java:748)
```
The pattern for closing the consumer is not safe.


> ConcurrentModificationException in TransactionalMessageCopier
> -
>
> Key: KAFKA-13155
> URL: https://issues.apache.org/jira/browse/KAFKA-13155
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> Caught this exception in a system test run:
> {code}
> [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 
> 'transactional-message-copier-shutdown-hook': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308)
> at 
> org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The pattern for closing the consumer is not safe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13095) TransactionsTest is failing in kraft mode

2021-08-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13095.
-
Resolution: Cannot Reproduce

I am closing this since `TransactionsTest` has not yet been converted to KRaft. 
There was some instability introduced when the topicId fetch changes were 
introduced, so I think this caused some confusion. 

> TransactionsTest is failing in kraft mode
> -
>
> Key: KAFKA-13095
> URL: https://issues.apache.org/jira/browse/KAFKA-13095
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> TransactionsTest#testSendOffsetsToTransactionTimeout keeps flaking on Jenkins.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13114) Unregsiter listener during renounce when the in-memory snapshot is missing

2021-08-01 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13114.
-
Resolution: Fixed

> Unregsiter listener during renounce when the in-memory snapshot is missing
> --
>
> Key: KAFKA-13114
> URL: https://issues.apache.org/jira/browse/KAFKA-13114
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Need to improve the renounce logic to do the following when the last 
> committer offset in-memory snapshot is missing:
>  # Reset the snapshot registry
>  # Unregister the listener from the RaftClient
>  # Register the listener from the RaftClient



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13151) Disallow policy configs in KRaft

2021-07-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13151.
-
Resolution: Fixed

> Disallow policy configs in KRaft
> 
>
> Key: KAFKA-13151
> URL: https://issues.apache.org/jira/browse/KAFKA-13151
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> alter.config.policy.class.name and create.topic.policy.class.name are 
> unsupported by KRaft. KRaft servers should fail startup if any of these are 
> configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13151) Disallow policy configs in KRaft

2021-07-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13151:

Affects Version/s: (was: 3.0.0)

> Disallow policy configs in KRaft
> 
>
> Key: KAFKA-13151
> URL: https://issues.apache.org/jira/browse/KAFKA-13151
> Project: Kafka
>  Issue Type: Task
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> alter.config.policy.class.name and create.topic.policy.class.name are 
> unsupported by KRaft. KRaft servers should fail startup if any of these are 
> configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13132:

Affects Version/s: (was: 3.0.0)

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13132.
-
Resolution: Fixed

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13132:
---

Assignee: Justine Olshan  (was: Jose Armando Garcia Sancio)

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13137) KRaft Controller Metric MBean names are incorrectly quoted

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13137:

Affects Version/s: (was: 3.0.0)

> KRaft Controller Metric MBean names are incorrectly quoted
> --
>
> Key: KAFKA-13137
> URL: https://issues.apache.org/jira/browse/KAFKA-13137
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.8.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Blocker
> Fix For: 3.0.0
>
>
> QuorumControllerMetrics is letting com.yammer.metrics.MetricName create the 
> MBean names for all of the controller metrics, and that adds quotes.  We have 
> typically used KafkaMetricsGroup to explicitly create the MBean name, and we 
> do not add quotes there.  The controller metric names that are in common 
> between the old and new controller must remain the same, but they are not.  
> For example, this non-KRaft MBean name:
> kafka.controller:type=KafkaController,name=OfflinePartitionsCount
> has morphed into this when using KRaft:
> "kafka.controller":type="KafkaController",name="OfflinePartitionsCount"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13143.
-
Resolution: Fixed

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13091:
---

Assignee: Jason Gustafson  (was: David Arthur)

> Increment HW after shrinking ISR through AlterIsr
> -
>
> Key: KAFKA-13091
> URL: https://issues.apache.org/jira/browse/KAFKA-13091
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> After we have shrunk the ISR, we have an opportunity to advance the high 
> watermark. We do this currently in `maybeShrinkIsr` after the synchronous 
> update through ZK. For the AlterIsr path, however, we cannot rely on this 
> call since the request is sent asynchronously. Instead we should attempt to 
> advance the high watermark in the callback when the AlterIsr response returns 
> successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13099:

Affects Version/s: 2.0.1
   2.4.1
   2.5.1
   2.6.1
   2.8.0
   2.7.1

> Message too large error when expiring transactionalIds
> --
>
> Key: KAFKA-13099
> URL: https://issues.apache.org/jira/browse/KAFKA-13099
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, 2.7.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing 
> tombstones for expired transactionalIds. This is possible because we collect 
> all expired IDs into a single batch. We should ensure that the created 
> batches are smaller than the max message size. Any expired IDs that cannot 
> fit can be expired later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13099:

Affects Version/s: 2.1.1
   2.2.2
   2.3.1

> Message too large error when expiring transactionalIds
> --
>
> Key: KAFKA-13099
> URL: https://issues.apache.org/jira/browse/KAFKA-13099
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, 
> 2.7.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing 
> tombstones for expired transactionalIds. This is possible because we collect 
> all expired IDs into a single batch. We should ensure that the created 
> batches are smaller than the max message size. Any expired IDs that cannot 
> fit can be expired later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13099) Message too large error when expiring transactionalIds

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13099.
-
Fix Version/s: 2.8.1
   2.7.2
   3.0.0
   Resolution: Fixed

> Message too large error when expiring transactionalIds
> --
>
> Key: KAFKA-13099
> URL: https://issues.apache.org/jira/browse/KAFKA-13099
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing 
> tombstones for expired transactionalIds. This is possible because we collect 
> all expired IDs into a single batch. We should ensure that the created 
> batches are smaller than the max message size. Any expired IDs that cannot 
> fit can be expired later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13146) Consider client use cases for accessing controller endpoints

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13146:

Description: 
In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this 
for two reasons. First, the implementation did not return any topic metadata. 
This was confusing for users who mistakenly tried to use the controller 
endpoint in order to describe or list topics since it would appear that no 
topics existed in the cluster. The second reason is that the implementation 
returned the controller endpoints. So even if we returned the topic metadata, 
clients would be unable to access the topics for reading or writing through the 
controller endpoint.

So for 3.0, we are effectively saying that clients should only access the 
broker endpoints. Long term, is that what we want? When running the controllers 
as separate nodes, it may be useful to initialize the controllers and cluster 
metadata before starting any of the brokers, for example. For this to work, we 
need to put some thought into how the Metadata API should work with 
controllers. For example, we can return a flag or some kind of error code in 
the response to indicate that topic metadata is not available. We have also 
considered whether the internal __cluster_metadata topic should be readable 
through the controller endpoints by consumers.




  was:
In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this 
for two reasons. First, the implementation did not return any topic metadata. 
This was confusing for users who mistakenly tried to use the controller 
endpoint in order to describe or list topics since it would appear that no 
topics existed in the cluster. The second reason is that the implementation 
returned the controller endpoints. So even if we returned the topic metadata, 
clients would be unable to access the topics for reading or writing through the 
controller endpoint.

So for 3.0, we are effectively saying that clients should only access the 
broker endpoints. Long term, is that what we want? When running the controllers 
as separate nodes, it may be useful to initialize the controllers and cluster 
metadata before starting any of the brokers, for example. For this to work, we 
need to put some thought into how the Metadata API should work with 
controllers. For example, we can return a flag or some kind of error code in 
the response to indicate that topic metadata is not available. We have also 
considered whether the internal __cluster_metadata topic should be readable 
through the controller endpoints.





> Consider client use cases for accessing controller endpoints
> 
>
> Key: KAFKA-13146
> URL: https://issues.apache.org/jira/browse/KAFKA-13146
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this 
> for two reasons. First, the implementation did not return any topic metadata. 
> This was confusing for users who mistakenly tried to use the controller 
> endpoint in order to describe or list topics since it would appear that no 
> topics existed in the cluster. The second reason is that the implementation 
> returned the controller endpoints. So even if we returned the topic metadata, 
> clients would be unable to access the topics for reading or writing through 
> the controller endpoint.
> So for 3.0, we are effectively saying that clients should only access the 
> broker endpoints. Long term, is that what we want? When running the 
> controllers as separate nodes, it may be useful to initialize the controllers 
> and cluster metadata before starting any of the brokers, for example. For 
> this to work, we need to put some thought into how the Metadata API should 
> work with controllers. For example, we can return a flag or some kind of 
> error code in the response to indicate that topic metadata is not available. 
> We have also considered whether the internal __cluster_metadata topic should 
> be readable through the controller endpoints by consumers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13146) Consider client use cases for accessing controller endpoints

2021-07-28 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13146:
---

 Summary: Consider client use cases for accessing controller 
endpoints
 Key: KAFKA-13146
 URL: https://issues.apache.org/jira/browse/KAFKA-13146
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this 
for two reasons. First, the implementation did not return any topic metadata. 
This was confusing for users who mistakenly tried to use the controller 
endpoint in order to describe or list topics since it would appear that no 
topics existed in the cluster. The second reason is that the implementation 
returned the controller endpoints. So even if we returned the topic metadata, 
clients would be unable to access the topics for reading or writing through the 
controller endpoint.

So for 3.0, we are effectively saying that clients should only access the 
broker endpoints. Long term, is that what we want? When running the controllers 
as separate nodes, it may be useful to initialize the controllers and cluster 
metadata before starting any of the brokers, for example. For this to work, we 
need to put some thought into how the Metadata API should work with 
controllers. For example, we can return a flag or some kind of error code in 
the response to indicate that topic metadata is not available. We have also 
considered whether the internal __cluster_metadata topic should be readable 
through the controller endpoints.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable

2021-07-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12851.
-
Resolution: Fixed

> Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
> ---
>
> Key: KAFKA-12851
> URL: https://issues.apache.org/jira/browse/KAFKA-12851
> Project: Kafka
>  Issue Type: Bug
>  Components: core, kraft
>Reporter: A. Sophie Blee-Goldman
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
> Attachments: Capture.PNG
>
>
> Failed twice on a [PR 
> build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/]
> h3. Stacktrace
> org.opentest4j.AssertionFailedError: expected:  but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13141:
---

Assignee: Rajini Sivaram  (was: Jason Gustafson)

> Leader should not update follower fetch offset if diverging epoch is present
> 
>
> Key: KAFKA-13141
> URL: https://issues.apache.org/jira/browse/KAFKA-13141
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Jason Gustafson
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol 
> instead of using the old OffsetsForLeaderEpoch API. When truncation is 
> detected, we return a `divergingEpoch` field in the Fetch response, but we do 
> not set an error code. The sender is expected to check if the diverging epoch 
> is present and truncate accordingly.
> All of this works correctly in the fetcher implementation, but the problem is 
> that the logic to update the follower fetch position on the leader does not 
> take into account the diverging epoch present in the response. This means the 
> fetch offsets can be updated incorrectly, which can lead to either log 
> divergence or the loss of committed data.
> For example, we hit the following case with 3 replicas. Leader 1 is elected 
> in epoch 1 with an end offset of 100. The followers are at offset 101
> Broker 1: (Leader) Epoch 1 from offset 100
> Broker 2: (Follower) Epoch 1 from offset 101
> Broker 3: (Follower) Epoch 1 from offset 101
> Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the 
> divergence and returns a diverging epoch in the fetch state. Nevertheless, 
> the fetch positions for both followers are updated to 101 and the high 
> watermark is advanced.
> After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a 
> network partition of some kind and was kicked from the ISR. This caused 
> broker 2 to get elected, which resulted in the following state at the start 
> of epoch 2.
> Broker 1: (Follower) Epoch 2 from offset 101
> Broker 2: (Leader) Epoch 2 from offset 100
> Broker 3: (Follower) Epoch 2 from offset 100
> Broker 2 was then able to write a new entry at offset 100 and the old record 
> which may have been exposed to consumers was deleted by broker 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13143:
---

Assignee: Niket Goel  (was: Jose Armando Garcia Sancio)

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13143:

Issue Type: Bug  (was: Improvement)

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13143:
---

 Summary: Disable Metadata endpoint for KRaft controller
 Key: KAFKA-13143
 URL: https://issues.apache.org/jira/browse/KAFKA-13143
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


The controller currently implements Metadata incompletely. Specifically, it 
does not return the metadata for any topics in the cluster. This may tend to 
cause confusion to users. For example, if someone used the controller endpoint 
by mistake in `kafka-topics.sh --list`, then they would see no topics in the 
cluster, which would be surprising. It would be better for 3.0 to disable 
Metadata on the controller since we currently expect clients to connect through 
brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present

2021-07-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13141:
---

 Summary: Leader should not update follower fetch offset if 
diverging epoch is present
 Key: KAFKA-13141
 URL: https://issues.apache.org/jira/browse/KAFKA-13141
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.1, 2.8.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0, 2.7.2, 2.8.1


In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol 
instead of using the old OffsetsForLeaderEpoch API. When truncation is 
detected, we return a `divergingEpoch` field in the Fetch response, but we do 
not set an error code. The sender is expected to check if the diverging epoch 
is present and truncate accordingly.

All of this works correctly in the fetcher implementation, but the problem is 
that the logic to update the follower fetch position on the leader does not 
take into account the diverging epoch present in the response. This means the 
fetch offsets can be updated incorrectly, which can lead to either log 
divergence or the loss of committed data.

For example, we hit the following case with 3 replicas. Leader 1 is elected in 
epoch 1 with an end offset of 100. The followers are at offset 101

Broker 1: (Leader) Epoch 1 from offset 100
Broker 2: (Follower) Epoch 1 from offset 101
Broker 3: (Follower) Epoch 1 from offset 101

Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the 
divergence and returns a diverging epoch in the fetch state. Nevertheless, the 
fetch positions for both followers are updated to 101 and the high watermark is 
advanced.

After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a 
network partition of some kind and was kicked from the ISR. This caused broker 
2 to get elected, which resulted in the following state at the start of epoch 2.

Broker 1: (Follower) Epoch 2 from offset 101
Broker 2: (Leader) Epoch 2 from offset 100
Broker 3: (Follower) Epoch 2 from offset 100

Broker 2 was then able to write a new entry at offset 100 and the old record 
which may have been exposed to consumers was deleted by broker 1.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13095) TransactionsTest is failing in kraft mode

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13095:
---

Assignee: Jason Gustafson  (was: David Arthur)

> TransactionsTest is failing in kraft mode
> -
>
> Key: KAFKA-13095
> URL: https://issues.apache.org/jira/browse/KAFKA-13095
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13113) Add unregister support to the RaftClient.

2021-07-23 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13113.
-
Resolution: Fixed

> Add unregister support to the RaftClient.
> -
>
> Key: KAFKA-13113
> URL: https://issues.apache.org/jira/browse/KAFKA-13113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Implement the following API:
> {code:java}
> interface RaftClient {
>   ListenerContext register(Listener);
>   void unregister(ListenerContext);
> }
> interface ListenerContext {
> }
> interface Listener {
>   void handleCommit(ListenerContext, BatchReader);
>   void handleSnapshot(ListenerContext, SnapshotReader);
>   void handleLeaderChange(ListenerContext, LeaderAndEpoch);
> } {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-23 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8529.

Resolution: Fixed

> Flakey test ConsumerBounceTest#testCloseDuringRebalance
> ---
>
> Key: KAFKA-8529
> URL: https://issues.apache.org/jira/browse/KAFKA-8529
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull]
>  
> *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance 
> STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22*
>  *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance 
> FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in 
> time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* 
> at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at 
> kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22*
>  at 
> kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22*
>  at 
> kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13127) Fix stray partition lookup logic

2021-07-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13127:

Description: The result of `BrokerMetadataPublisher.findGhostReplicas` is 
inverted. It returns all of the non-stray replicas. This causes all of these 
partitions to get deleted on startup by mistake.  (was: The result of 
`BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the 
non-stray replicas. This causes all off these partitions to get deleted on 
startup by mistake.)

> Fix stray partition lookup logic
> 
>
> Key: KAFKA-13127
> URL: https://issues.apache.org/jira/browse/KAFKA-13127
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It 
> returns all of the non-stray replicas. This causes all of these partitions to 
> get deleted on startup by mistake.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13127) Fix stray partition lookup logic

2021-07-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13127:
---

 Summary: Fix stray partition lookup logic
 Key: KAFKA-13127
 URL: https://issues.apache.org/jira/browse/KAFKA-13127
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0


The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It 
returns all of the non-stray replicas. This causes all off these partitions to 
get deleted on startup by mistake.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13111) Re-evaluate Fetch Sessions when using topic IDs

2021-07-20 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384526#comment-17384526
 ] 

Jason Gustafson commented on KAFKA-13111:
-

Thanks, this makes sense to me. At a high level, we want to keep the protocol 
simple even if it costs a little more complexity in the implementation. I think 
it is indeed simpler if topic IDs are handled similarly to topic names. When a 
topic ID is unknown, we nevertheless store it in the session and keep sending 
the client UNKNOWN_TOPIC_ID until it gets removed through the forgotten topic 
list or the broker learns the topic name mapping from the controller. 

> Re-evaluate Fetch Sessions when using topic IDs
> ---
>
> Key: KAFKA-13111
> URL: https://issues.apache.org/jira/browse/KAFKA-13111
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.1.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> For fetch request version 13 we have the current method of handling unknown 
> topic IDs.
>  * When the receiving broker sees an unknown topic ID in a request or 
> encounters an inconsistent (mismatched) ID in the logs, it sends a top-level 
> error back, delays *all* partitions (in fetcher thread), and closes the 
> session
> Ideally, we want to handle the same way as unknown topic names. We hold the 
> topic partition in the session and try to resolve on a future fetch request. 
> However, there are a few complications with this approach and this is why we 
> opted to simply close the session. One is that many objects in the fetch 
> session are keyed using topic name+partition. We also still need the topic 
> name for a few other checks in the fetch path.
> Still, upon further inspection, closing the session on any new topics (when 
> using topic names, we often see a few unknown topic or partition exceptions) 
> is not ideal.
> One way to see similar behavior in the topic ID case is to store unresolved 
> IDs in the session. For compatibility reasons, we will need to add and/or 
> generify a few classes. The general idea is that for sessions using version 
> 13+ we will use a structure that keys using topic ID and partition with an 
> optional name. That way, we can send partition level errors for unknown topic 
> IDs and handle them later in the same session. We can also remove partitions 
> using topic ID more easily if the unknown topic ID was due to stale metadata.
> Then the new method will be as follows:
>  * When the receiving broker sees an unknown topic ID in a request or 
> encounters an inconsistent (mismatched) ID in the logs, it sends an error on 
> the unknown partition, delay *only those* partitions (in fetcher thread), and 
> keep the session open to try to resolve in the future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException

2021-07-19 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383650#comment-17383650
 ] 

Jason Gustafson commented on KAFKA-12644:
-

Downgrading priority since this is not a blocker. We can nevertheless aim for 
3.0.

> Add Missing Class-Level Javadoc to Descendants of 
> org.apache.kafka.common.errors.ApiException
> -
>
> Key: KAFKA-12644
> URL: https://issues.apache.org/jira/browse/KAFKA-12644
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Affects Versions: 3.0.0, 2.8.1
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: documentation
> Fix For: 3.0.0, 2.8.1
>
>
> I noticed that class-level Javadocs are missing from some classes in the 
> org.apache.kafka.common.errors package. This issue is for tracking the work 
> of adding the missing class-level javadocs for those Exception classes.
> https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html
> https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors
> Basic class-level documentation could be derived by mapping the error 
> conditions documented in the protocol
> https://kafka.apache.org/protocol#protocol_constants



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException

2021-07-19 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12644:

Priority: Major  (was: Blocker)

> Add Missing Class-Level Javadoc to Descendants of 
> org.apache.kafka.common.errors.ApiException
> -
>
> Key: KAFKA-12644
> URL: https://issues.apache.org/jira/browse/KAFKA-12644
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Affects Versions: 3.0.0, 2.8.1
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>  Labels: documentation
> Fix For: 3.0.0, 2.8.1
>
>
> I noticed that class-level Javadocs are missing from some classes in the 
> org.apache.kafka.common.errors package. This issue is for tracking the work 
> of adding the missing class-level javadocs for those Exception classes.
> https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html
> https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors
> Basic class-level documentation could be derived by mapping the error 
> conditions documented in the protocol
> https://kafka.apache.org/protocol#protocol_constants



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13099) Message too large error when expiring transactionalIds

2021-07-16 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13099:
---

 Summary: Message too large error when expiring transactionalIds
 Key: KAFKA-13099
 URL: https://issues.apache.org/jira/browse/KAFKA-13099
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing 
tombstones for expired transactionalIds. This is possible because we collect 
all expired IDs into a single batch. We should ensure that the created batches 
are smaller than the max message size. Any expired IDs that cannot fit can be 
expired later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   4   5   6   7   8   9   10   >