[jira] [Resolved] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12257. - Resolution: Fixed > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.1.0, 2.8.2, 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of the
[jira] [Resolved] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
[ https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13071. - Fix Version/s: 3.1.0 Resolution: Fixed > Deprecate and remove --authorizer option in kafka-acls.sh > - > > Key: KAFKA-13071 > URL: https://issues.apache.org/jira/browse/KAFKA-13071 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: needs-kip > Fix For: 3.1.0 > > > Now that we have all of the ACL APIs implemented through the admin client, we > should consider deprecating and removing support for the --authorizer flag in > kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604
[ https://issues.apache.org/jira/browse/KAFKA-10104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10104. - Resolution: Duplicate > Remove deprecated --zookeeper flags as specified in KIP-604 > --- > > Key: KAFKA-10104 > URL: https://issues.apache.org/jira/browse/KAFKA-10104 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > > Remove deprecated --zookeeper flags as specified in KIP-604 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Fix Version/s: 2.8.2 > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.1.0, 3.0.0, 2.8.2 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of th
[jira] [Assigned] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
[ https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13071: --- Assignee: Jason Gustafson (was: HaiyuanZhao) > Deprecate and remove --authorizer option in kafka-acls.sh > - > > Key: KAFKA-13071 > URL: https://issues.apache.org/jira/browse/KAFKA-13071 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: needs-kip > > Now that we have all of the ACL APIs implemented through the admin client, we > should consider deprecating and removing support for the --authorizer flag in > kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
[ https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444032#comment-17444032 ] Jason Gustafson commented on KAFKA-13071: - [~zhaohaidao] I'm going to pick this up so that we can get it into 3.1. Please let me know if you were planning to submit something soon. > Deprecate and remove --authorizer option in kafka-acls.sh > - > > Key: KAFKA-13071 > URL: https://issues.apache.org/jira/browse/KAFKA-13071 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: HaiyuanZhao >Priority: Major > Labels: needs-kip > > Now that we have all of the ACL APIs implemented through the admin client, we > should consider deprecating and removing support for the --authorizer flag in > kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13421. - Resolution: Fixed > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Major > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
[ https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442028#comment-17442028 ] Jason Gustafson edited comment on KAFKA-13071 at 11/11/21, 1:30 AM: I updated KIP-604 to include this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools. I also sent a message to the vote thread to see if everyone is ok with the change. [~zhaohaidao] Are you planning to pick this up? was (Author: hachikuji): I updated the KIP-604 to include this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools. I also sent a message to the vote thread to see if everyone is ok with the change. [~zhaohaidao] Are you planning to pick this up? > Deprecate and remove --authorizer option in kafka-acls.sh > - > > Key: KAFKA-13071 > URL: https://issues.apache.org/jira/browse/KAFKA-13071 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: HaiyuanZhao >Priority: Major > Labels: needs-kip > > Now that we have all of the ACL APIs implemented through the admin client, we > should consider deprecating and removing support for the --authorizer flag in > kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh
[ https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442028#comment-17442028 ] Jason Gustafson commented on KAFKA-13071: - I updated the KIP-604 to include this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools. I also sent a message to the vote thread to see if everyone is ok with the change. [~zhaohaidao] Are you planning to pick this up? > Deprecate and remove --authorizer option in kafka-acls.sh > - > > Key: KAFKA-13071 > URL: https://issues.apache.org/jira/browse/KAFKA-13071 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: HaiyuanZhao >Priority: Major > Labels: needs-kip > > Now that we have all of the ACL APIs implemented through the admin client, we > should consider deprecating and removing support for the --authorizer flag in > kafka-acls.sh. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12975. - Resolution: Duplicate > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reopened KAFKA-12975: - > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12975) Consider how Topic IDs can improve consume experience
[ https://issues.apache.org/jira/browse/KAFKA-12975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12975. - Resolution: Fixed Closing in favor of https://issues.apache.org/jira/browse/KAFKA-13447 > Consider how Topic IDs can improve consume experience > - > > Key: KAFKA-12975 > URL: https://issues.apache.org/jira/browse/KAFKA-12975 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > Currently when a consumer subscribes to a topic, it will continue to consume > from this topic across topic deletions and recreations with the same name. By > adding topic IDs to the consumer, we will have more insight for these events. > We should figure out if we want to change consumer logic now that we have > this information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13447) Consumer should not reuse committed offset after topic recreation
Jason Gustafson created KAFKA-13447: --- Summary: Consumer should not reuse committed offset after topic recreation Key: KAFKA-13447 URL: https://issues.apache.org/jira/browse/KAFKA-13447 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson KAFKA-12257 fixes an issue in which the consumer is unable to make progress after a topic has been recreated. The problem was that the client could not distinguish between stale metadata with a lower leader epoch and a recreated topic with a lower leader epoch. With TopicId support in KIP-516, the client is able to tell when a topic has been recreated since the new topic will have a different ID. However, what the patch did not fix is the potential reuse of the current offset position on the recreated topic. For example, say that the consumer is at offset N when the topic gets recreated. Currently, the consumer will continue fetching from offset N after detecting the recreation. The most likely result of this is either an offset out of range error or a log truncation error, but it is also possible for the offset position to remain valid on the recreated topic (say for a low-volume topic where the offsets is already low, or a case where the consumer was down for a while). To fix this issue completely, we need to store the topicId along with the committed offset in __consumer_offsets. This would allow the consumer to detect when the offset is no longer relevant for the current topic. We also need to decide how to raise this case to the user. If the user has enabled automatic offset reset, we can probably use that. Otherwise, we might need a new exception type to signal the user that the position needs to be reset. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13421: --- Assignee: Jason Gustafson > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Major > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed
[ https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13417. - Resolution: Fixed > Dynamic thread pool re-configurations may not get processed > --- > > Key: KAFKA-13417 > URL: https://issues.apache.org/jira/browse/KAFKA-13417 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to > update the current configuration and to let each `Reconfigurable` process the > update: > {code} > val oldConfig = currentConfig > val (newConfig, brokerReconfigurablesToUpdate) = > processReconfiguration(newProps, validateOnly = false) > if (newConfig ne currentConfig) { > currentConfig = newConfig > kafkaConfig.updateCurrentConfig(newConfig) > // Process BrokerReconfigurable updates after current config is updated > brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, > newConfig)) > } > {code} > The problem here is that `currentConfig` gets initialized as `kafkaConfig` > which means that the first call to > `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` > and consequently `oldConfig`. The problem with this is that some of the > `reconfigure` implementations will only apply a new configuration if the > value in `oldConfig` does not match the value in `newConfig`. For example, > here is the logic to update thread pools dynamically: > {code} > override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): > Unit = { > if (newConfig.numIoThreads != oldConfig.numIoThreads) > > server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) > if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) > server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, > newConfig.numNetworkThreads) > if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) > > server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) > if (newConfig.numRecoveryThreadsPerDataDir != > oldConfig.numRecoveryThreadsPerDataDir) > > server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) > if (newConfig.backgroundThreads != oldConfig.backgroundThreads) > server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) > } > {code} > Because of this, the dynamic update will not get applied the first time it is > made. I believe subsequent updates would work correctly though because we > would have lost the indirect reference to `kafkaConfig`. Other than the > `DynamicThreadPool` configurations, it looks like the config to update > unclean leader election may also be affected by this bug. > NOTE: This bug only affects kraft, which is missing the call to > `DynamicBrokerConfig.initialize()`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed
[ https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13417: Description: `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug. NOTE: This bug only affects kraft, which is missing the call to `DynamicBrokerConfig.initialize()`. was: `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by th
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Fix Version/s: 3.1.0 > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.1.0, 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of the topic
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Affects Version/s: 2.3.1 2.2.2 > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Affects Version/s: 2.6.1 2.5.1 2.4.1 > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader e
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Affects Version/s: 2.8.1 2.7.1 (was: 2.2.0) (was: 3.1.0) > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.1.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > af
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Fix Version/s: 3.0.0 > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.1.0, 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of the topic id, > rather than the name. > One
[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name
[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12257: Fix Version/s: (was: 3.1.0) > Consumer mishandles topics deleted and recreated with the same name > > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.7.1, 2.8.1 >Reporter: Ryan Leslie >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.0.0 > > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of the topic id, > rather than the name. >
[jira] [Created] (KAFKA-13417) Dynamic thread pool re-configurations may not get processed
Jason Gustafson created KAFKA-13417: --- Summary: Dynamic thread pool re-configurations may not get processed Key: KAFKA-13417 URL: https://issues.apache.org/jira/browse/KAFKA-13417 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13412) Retry of initTransactions after timeout may cause invalid transition
Jason Gustafson created KAFKA-13412: --- Summary: Retry of initTransactions after timeout may cause invalid transition Key: KAFKA-13412 URL: https://issues.apache.org/jira/browse/KAFKA-13412 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson If `initTransactions()` cannot be completed before the timeout defined by `max.block.ms`, then the call will raise a `TimeoutException`. The user is expected to retry this, which is what Kafka Streams does. However, the producer will keep retrying the `InitProducerId` request in the background and it is possible for it to return before the retry call to `initTransaction()`. This leads to the following exception: {code} org.apache.kafka.common.KafkaException: TransactionalId blah: Invalid transition attempted from state READY to state INITIALIZING at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1077) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1070) at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$initializeTransactions$1(TransactionManager.java:336) at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1198) at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:333) at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:328) at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:597) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10800. - Resolution: Fixed > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Haoran Xuan >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset of the snapshot is less than or equal to the high-watermark. > # The epoch of the snapshot is less than or equal to the quorum epoch. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
Jason Gustafson created KAFKA-13319: --- Summary: Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty Key: KAFKA-13319 URL: https://issues.apache.org/jira/browse/KAFKA-13319 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson If a user calls `Producer.sendOffsetsToTransaction` with an empty map of offsets, we can shortcut return and skip the logic to add the offsets topic to the transaction. The main benefit is avoiding the unnecessary accumulation of markers in __consumer_offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13254) Deadlock when expanding ISR
[ https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13254. - Resolution: Fixed > Deadlock when expanding ISR > --- > > Key: KAFKA-13254 > URL: https://issues.apache.org/jira/browse/KAFKA-13254 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Found this when debugging downgrade system test failures. The patch for > https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here > are the jstack details: > {code} > "data-plane-kafka-request-handler-4": > > > waiting for ownable synchronizer 0xfcc00020, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > > which is held by "data-plane-kafka-request-handler-5" > > > "data-plane-kafka-request-handler-5": > waiting for ownable synchronizer 0xc9161b20, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "data-plane-kafka-request-handler-4" > "data-plane-kafka-request-handler-4": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfcc00020> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at > kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264) > at > kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907) > at > kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340) > at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown > Source) > at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74) > at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345) > at kafka.cluster.Partition.expandIsr(Partition.scala:1312) > at > kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755) > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754) > at > kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672) > at > kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806) > at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown > Source) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42) > at > kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790) > at > kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970) > at kafka.server.KafkaApis.handle(KafkaApis.scala:173) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(Thread.java:748) > "data-plane-kafka-request-handler-5": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc9161b20> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSyn
[jira] [Commented] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr
[ https://issues.apache.org/jira/browse/KAFKA-13227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416349#comment-17416349 ] Jason Gustafson commented on KAFKA-13227: - [~christo_lolov] When we have updated the ISR state after receiving a LeaderAndIsr request, we have an opportunity to cancel any inflight AlterIsr requests since they would be doomed to fail. When we do this, we want make sure that the callback could not be mistakenly applied. We protect against this currently by checking in the callback whether the ISR state matches the pending state we expected when the AlterIsr request was enqueued. However, the check is a little weak since it does not include the update version, so it is conceivable that we could enqueue up the same change a second time. This is probably not the only way to do it. Maybe we just need a flag in the callback to indicate that the request was cancelled. > Cancel pending AlterIsr requests after receiving LeaderAndIsr > - > > Key: KAFKA-13227 > URL: https://issues.apache.org/jira/browse/KAFKA-13227 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Christo Lolov >Priority: Major > > Currently we do not cancel pending AlterIsr requests after the state has been > updated through a LeaderAndIsr request received from the controller. This > leads to log messages such as this > {code} > [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] > Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, > isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition > __transaction_state-32 (kafka.cluster.Partition) > {code} > I think the only complication here is protecting against the AlterIsr > callback which is executed asynchronously. To address this, we can move the > `zkVersion` field into `IsrState`. When the callback is invoked, we can the > existing state against the response state to decide whether to apply the > change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13162. - Resolution: Fixed > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.1.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13162: Fix Version/s: (was: 3.0.1) 3.1.0 > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.1.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415051#comment-17415051 ] Jason Gustafson commented on KAFKA-7408: [~jagsancio] The basic idea was to truncate to the LSO and then rewrite markers for any transactions which were in progress. For example, suppose we have this in the log: x1 x2 y1 y2 z1 z2 xC yA So the transaction beginning at z1 is where the LSO would be. The idea is then to truncate to that point and rewrite the markers that came after: x1 x2 y1 y2 xC yA But I realize now that this approach was probably unnecessarily aggressive. I wasn't taking into account transactions which started and ended after the LSO. For example: x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC Now we have another x transaction which begins after the LSO. We'd lose this if we have to truncate. So I'm not thinking your 2) suggestion is probably a better approach. Rather than truncating, we can abort. This is simpler to implement since it does not involve any rewriting of markers. The example above becomes this: x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC zA I had previously thought that it was a stronger guarantee if we could say that no transaction outcomes were changed. But I realize now that there is probably not a practical difference between dropping the data from a transaction and just aborting it. > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13149) Null Pointer Exception for record==null when handling a produce request
[ https://issues.apache.org/jira/browse/KAFKA-13149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13149. - Fix Version/s: 3.0.1 Resolution: Fixed > Null Pointer Exception for record==null when handling a produce request > --- > > Key: KAFKA-13149 > URL: https://issues.apache.org/jira/browse/KAFKA-13149 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Cong Ding >Priority: Major > Fix For: 3.0.1 > > > In production, we have seen an exception > {code:java} > java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.common.record.Record.hasMagic(byte)" because "record" is > null{code} > which is triggered by > > [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191] > when handling a produce request. > The reason is that > [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L294-L296] > returns record==null, which is possibly caused by a bad client. However, we > have no idea about the client in our multi-tenant environment. > We should let the broker throw an invalid record exception and notify clients. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics
[ https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13288. - Resolution: Fixed > Transaction find-hanging command with --broker-id excludes internal topics > -- > > Key: KAFKA-13288 > URL: https://issues.apache.org/jira/browse/KAFKA-13288 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is > specified. By default, this excludes internal topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics
[ https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413278#comment-17413278 ] Jason Gustafson commented on KAFKA-13288: - Note that it is still possible to use it with the --topic option instead. For example: {code} kafka-transaction.sh --bootstrap-server localhost:9092 find-hanging --topic __consumer_offsets {code} > Transaction find-hanging command with --broker-id excludes internal topics > -- > > Key: KAFKA-13288 > URL: https://issues.apache.org/jira/browse/KAFKA-13288 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is > specified. By default, this excludes internal topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics
Jason Gustafson created KAFKA-13288: --- Summary: Transaction find-hanging command with --broker-id excludes internal topics Key: KAFKA-13288 URL: https://issues.apache.org/jira/browse/KAFKA-13288 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is specified. By default, this excludes internal topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13234) Transaction system tests should check URPs between broker bounces
[ https://issues.apache.org/jira/browse/KAFKA-13234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13234. - Resolution: Fixed > Transaction system tests should check URPs between broker bounces > - > > Key: KAFKA-13234 > URL: https://issues.apache.org/jira/browse/KAFKA-13234 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We've seen a number of failing transaction system tests. After investigating > a few of these, I found that this came down to transaction timeouts which > were due to partition unavailability. The test rolls the brokers in the > cluster, but it does not verify that URPs have cleared after restarting each > broker. This means that it is possible for partitions to go under min isr or > even offline. In the case of a hard bounce, the time to recover may be quite > long because of the default session timeout of 18s. It would be good wait for > URPs to clear instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr
[ https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13091: Fix Version/s: (was: 3.0.1) > Increment HW after shrinking ISR through AlterIsr > - > > Key: KAFKA-13091 > URL: https://issues.apache.org/jira/browse/KAFKA-13091 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > After we have shrunk the ISR, we have an opportunity to advance the high > watermark. We do this currently in `maybeShrinkIsr` after the synchronous > update through ZK. For the AlterIsr path, however, we cannot rely on this > call since the request is sent asynchronously. Instead we should attempt to > advance the high watermark in the callback when the AlterIsr response returns > successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13254) Deadlock when expanding ISR
[ https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13254: Issue Type: Bug (was: Improvement) > Deadlock when expanding ISR > --- > > Key: KAFKA-13254 > URL: https://issues.apache.org/jira/browse/KAFKA-13254 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Found this when debugging downgrade system test failures. The patch for > https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here > are the jstack details: > {code} > "data-plane-kafka-request-handler-4": > > > waiting for ownable synchronizer 0xfcc00020, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > > which is held by "data-plane-kafka-request-handler-5" > > > "data-plane-kafka-request-handler-5": > waiting for ownable synchronizer 0xc9161b20, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "data-plane-kafka-request-handler-4" > "data-plane-kafka-request-handler-4": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfcc00020> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at > kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264) > at > kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907) > at > kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340) > at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown > Source) > at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74) > at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345) > at kafka.cluster.Partition.expandIsr(Partition.scala:1312) > at > kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755) > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754) > at > kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672) > at > kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806) > at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown > Source) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42) > at > kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790) > at > kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970) > at kafka.server.KafkaApis.handle(KafkaApis.scala:173) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(Thread.java:748) > "data-plane-kafka-request-handler-5": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc9161b20> (a > java.util.concurrent.locks.ReentrantReadWrit
[jira] [Created] (KAFKA-13254) Deadlock when expanding ISR
Jason Gustafson created KAFKA-13254: --- Summary: Deadlock when expanding ISR Key: KAFKA-13254 URL: https://issues.apache.org/jira/browse/KAFKA-13254 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Found this when debugging downgrade system test failures. The patch for https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here are the jstack details: {code} "data-plane-kafka-request-handler-4": waiting for ownable synchronizer 0xfcc00020, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "data-plane-kafka-request-handler-5" "data-plane-kafka-request-handler-5": waiting for ownable synchronizer 0xc9161b20, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), which is held by "data-plane-kafka-request-handler-4" "data-plane-kafka-request-handler-4": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xfcc00020> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264) at kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59) at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907) at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340) at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source) at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74) at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345) at kafka.cluster.Partition.expandIsr(Partition.scala:1312) at kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755) at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754) at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672) at kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806) at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown Source) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42) at kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970) at kafka.server.KafkaApis.handle(KafkaApis.scala:173) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(Thread.java:748) "data-plane-kafka-request-handler-5": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc9161b20> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) at java.util.concurren
[jira] [Commented] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr
[ https://issues.apache.org/jira/browse/KAFKA-13227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405420#comment-17405420 ] Jason Gustafson commented on KAFKA-13227: - [~christo_lolov] Thanks, feel free to pick it up. I would suggest looking at the test cases in `PartitionTest`. For example, `testRetryShrinkIsr` looks like it already has most of the machinery you will need. > Cancel pending AlterIsr requests after receiving LeaderAndIsr > - > > Key: KAFKA-13227 > URL: https://issues.apache.org/jira/browse/KAFKA-13227 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > Currently we do not cancel pending AlterIsr requests after the state has been > updated through a LeaderAndIsr request received from the controller. This > leads to log messages such as this > {code} > [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] > Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, > isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition > __transaction_state-32 (kafka.cluster.Partition) > {code} > I think the only complication here is protecting against the AlterIsr > callback which is executed asynchronously. To address this, we can move the > `zkVersion` field into `IsrState`. When the callback is invoked, we can the > existing state against the response state to decide whether to apply the > change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13235) Add support for static groups to transaction system tests
Jason Gustafson created KAFKA-13235: --- Summary: Add support for static groups to transaction system tests Key: KAFKA-13235 URL: https://issues.apache.org/jira/browse/KAFKA-13235 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Currently `TransactionalMessageCopier` does not support static groups configured using `group.instance.id`. We should consider adding support and parameterizing the corresponding system tests which rely on it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13234) Transaction system tests should check URPs between broker bounces
Jason Gustafson created KAFKA-13234: --- Summary: Transaction system tests should check URPs between broker bounces Key: KAFKA-13234 URL: https://issues.apache.org/jira/browse/KAFKA-13234 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We've seen a number of failing transaction system tests. After investigating a few of these, I found that this came down to transaction timeouts which were due to partition unavailability. The test rolls the brokers in the cluster, but it does not verify that URPs have cleared after restarting each broker. This means that it is possible for partitions to go under min isr or even offline. In the case of a hard bounce, the time to recover may be quite long because of the default session timeout of 18s. It would be good wait for URPs to clear instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13227) Cancel pending AlterIsr requests after receiving LeaderAndIsr
Jason Gustafson created KAFKA-13227: --- Summary: Cancel pending AlterIsr requests after receiving LeaderAndIsr Key: KAFKA-13227 URL: https://issues.apache.org/jira/browse/KAFKA-13227 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Currently we do not cancel pending AlterIsr requests after the state has been updated through a LeaderAndIsr request received from the controller. This leads to log messages such as this {code} [2021-08-23 18:12:47,317] WARN [Partition __transaction_state-32 broker=3] Failed to enqueue ISR change state LeaderAndIsr(leader=3, leaderEpoch=3, isUncleanLeader=false, isr=List(3, 1), zkVersion=3) for partition __transaction_state-32 (kafka.cluster.Partition) {code} I think the only complication here is protecting against the AlterIsr callback which is executed asynchronously. To address this, we can move the `zkVersion` field into `IsrState`. When the callback is invoked, we can the existing state against the response state to decide whether to apply the change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13214) Consumer should not reset group state after disconnect
[ https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13214. - Fix Version/s: 2.8.1 2.7.2 3.0.0 Resolution: Fixed > Consumer should not reset group state after disconnect > -- > > Key: KAFKA-13214 > URL: https://issues.apache.org/jira/browse/KAFKA-13214 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.8.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > When the consumer disconnects from the coordinator while a rebalance is in > progress, we currently reset the memberId and generation. The coordinator > then must await the session timeout in order to expire the old memberId. This > was apparently a regression from > https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. > It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr
[ https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13091. - Fix Version/s: 3.0.1 Resolution: Fixed > Increment HW after shrinking ISR through AlterIsr > - > > Key: KAFKA-13091 > URL: https://issues.apache.org/jira/browse/KAFKA-13091 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.1 > > > After we have shrunk the ISR, we have an opportunity to advance the high > watermark. We do this currently in `maybeShrinkIsr` after the synchronous > update through ZK. For the AlterIsr path, however, we cannot rely on this > call since the request is sent asynchronously. Instead we should attempt to > advance the high watermark in the callback when the AlterIsr response returns > successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9938) Fix debug consumer read from follower for older protocol versions
[ https://issues.apache.org/jira/browse/KAFKA-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-9938. Resolution: Won't Fix I am going to close this issue as "won't fix." Since the new `Fetch` protocol versions allows fetching from followers already, there seems little reason to add this logic just for older versions. The `ReplicaVerificationTool` will still work for example. Effectively, the behavior of the debug consumer is the same as the normal consumer. > Fix debug consumer read from follower for older protocol versions > - > > Key: KAFKA-9938 > URL: https://issues.apache.org/jira/browse/KAFKA-9938 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > In fetch requests, a sentinel of -2 for the replicaId is treated as a "debug > consumer" and is allowed to fetch from followers. In KIP-392, we added the > general ability to read from followers, but we require a newer version of the > protocol. In the process of this change, we lost the ability for older > version of the fetch protocol to use the "debug consumer" to read from > followers. As far as I know the only place this capability is used is in the > ReplicaVerificationTool. We don't expose this capability from the consumer. > However, it is still technically a regression and should be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9886) Validate segment range before reading in `Log.read`
[ https://issues.apache.org/jira/browse/KAFKA-9886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-9886: -- Assignee: (was: Jason Gustafson) > Validate segment range before reading in `Log.read` > --- > > Key: KAFKA-9886 > URL: https://issues.apache.org/jira/browse/KAFKA-9886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > Log.read uses the following logic to set the upper limit on a segment read. > {code} > val maxPosition = { >// Use the max offset position if it is on this segment; otherwise, the > segment size is the limit. > if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) { > maxOffsetMetadata.relativePositionInSegment > } else { > segment.size > } > } > {code} > In the else branch, the expectation is that > `maxOffsetMetadata.segmentBaseOffset > segment.baseOffset`. In KAFKA-9838, we > found a bug where this assumption failed which led to reads above the high > watermark. We should validate the expectation explicitly so that we don't > leave the door open for similar bugs in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13221) Add metric for `PartitionsWithLateTransactionsCount`
Jason Gustafson created KAFKA-13221: --- Summary: Add metric for `PartitionsWithLateTransactionsCount` Key: KAFKA-13221 URL: https://issues.apache.org/jira/browse/KAFKA-13221 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson The metric `PartitionsWithLateTransactionsCount` was introduced in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-Metrics. This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13214) Consumer should not reset group state after disconnect
[ https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13214: --- Assignee: Jason Gustafson > Consumer should not reset group state after disconnect > -- > > Key: KAFKA-13214 > URL: https://issues.apache.org/jira/browse/KAFKA-13214 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.8.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > > When the consumer disconnects from the coordinator while a rebalance is in > progress, we currently reset the memberId and generation. The coordinator > then must await the session timeout in order to expire the old memberId. This > was apparently a regression from > https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. > It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13214) Consumer should not reset group state after disconnect
Jason Gustafson created KAFKA-13214: --- Summary: Consumer should not reset group state after disconnect Key: KAFKA-13214 URL: https://issues.apache.org/jira/browse/KAFKA-13214 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0, 2.7.0 Reporter: Jason Gustafson When the consumer disconnects from the coordinator while a rebalance is in progress, we currently reset the memberId and generation. The coordinator then must await the session timeout in order to expire the old memberId. This was apparently a regression from https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398209#comment-17398209 ] Jason Gustafson commented on KAFKA-13162: - This has turned out to be more work than expected. I am going to change the target version to 3.0.1. cc [~kkonstantine] > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13162: Fix Version/s: (was: 3.0.0) 3.0.1 > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.1 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13161. - Resolution: Fixed > Follower leader and ISR state not updated after partition change in KRaft > - > > Key: KAFKA-13161 > URL: https://issues.apache.org/jira/browse/KAFKA-13161 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.0.0 > > > In KRaft when we detect a partition change, we first verify whether any > leader or follower transitions are needed. Depending on the case, we call > either `applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter > case, we are missing a call to `Partition.makeFollower` which is responsible > for updating LeaderAndIsr state for the partitions. As a result of this, the > partition state may be left stale. > The specific consequences of this bug are 1) follower fetching fails since > the epoch is never updated, and 2) a stale leader may continue to accept > Produce requests. The latter is the bigger issue since it can lead to log > divergence if we are appending from both the client and from the fetcher > thread at the same time. I tested this locally and confirmed that it is > possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier
[ https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13155. - Resolution: Fixed > ConcurrentModificationException in TransactionalMessageCopier > - > > Key: KAFKA-13155 > URL: https://issues.apache.org/jira/browse/KAFKA-13155 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Caught this exception in a system test run: > {code} > [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread > 'transactional-message-copier-shutdown-hook': > (org.apache.kafka.common.utils.KafkaThread) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) > at > org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) > at java.lang.Thread.run(Thread.java:748) > {code} > The pattern for closing the consumer is not safe. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13132. - Resolution: Fixed > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394319#comment-17394319 ] Jason Gustafson commented on KAFKA-13173: - I upgraded this bug to a blocker because I think it can result in data loss. For example, in the example above, the second ISR change would be interpreted as an expansion, but there may have been committed writes to the log between the two ISR changes which were not reflected in the expansion. > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Description: In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. was: In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Fix Version/s: 3.0.0 > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13173: Priority: Blocker (was: Major) > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
Jason Gustafson created KAFKA-13173: --- Summary: KRaft controller does not handle simultaneous broker expirations correctly Key: KAFKA-13173 URL: https://issues.apache.org/jira/browse/KAFKA-13173 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current stale replicas and attempt to remove them from the ISR. However, when multiple expirations occur at once, we do not properly accumulate the ISR changes. For example, I ran a test where the ISR of a partition was initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at the same time. The records that were generated by `fenceStaleBrokers` were the following: {code} ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, removingReplicas=null, addingReplicas=null) at version 0), ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] {code} First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing of broker 3 is handled. So we did not account for the fact that we had already fenced broker 2 in the request. A simple solution for now is to change the logic to handle fencing only one broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown
[ https://issues.apache.org/jira/browse/KAFKA-13167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13167. - Fix Version/s: 3.0.0 Resolution: Fixed > KRaft broker should heartbeat immediately during controlled shutdown > > > Key: KAFKA-13167 > URL: https://issues.apache.org/jira/browse/KAFKA-13167 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > Controlled shutdown in KRaft is signaled through a heartbeat request with the > `shouldShutDown` flag set to true. When we begin controlled shutdown, we > should immediately schedule the next heartbeat instead of waiting for the > next periodic heartbeat so that we can shutdown more quickly. Otherwise > controlled shutdown can be delayed by several seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown
Jason Gustafson created KAFKA-13167: --- Summary: KRaft broker should heartbeat immediately during controlled shutdown Key: KAFKA-13167 URL: https://issues.apache.org/jira/browse/KAFKA-13167 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Controlled shutdown in KRaft is signaled through a heartbeat request with the `shouldShutDown` flag set to true. When we begin controlled shutdown, we should immediately schedule the next heartbeat instead of waiting for the next periodic heartbeat so that we can shutdown more quickly. Otherwise controlled shutdown can be delayed by several seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13162) ElectLeader API must be forwarded to Controller
Jason Gustafson created KAFKA-13162: --- Summary: ElectLeader API must be forwarded to Controller Key: KAFKA-13162 URL: https://issues.apache.org/jira/browse/KAFKA-13162 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0 We're missing the logic to forward ElectLeaders requests to the controller. This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft
Jason Gustafson created KAFKA-13161: --- Summary: Follower leader and ISR state not updated after partition change in KRaft Key: KAFKA-13161 URL: https://issues.apache.org/jira/browse/KAFKA-13161 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 In KRaft when we detect a partition change, we first verify whether any leader or follower transitions are needed. Depending on the case, we call either `applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter case, we are missing a call to `Partition.makeFollower` which is responsible for updating LeaderAndIsr state for the partitions. As a result of this, the partition state may be left stale. The specific consequences of this bug are 1) follower fetching fails since the epoch is never updated, and 2) a stale leader may continue to accept Produce requests. The latter is the bigger issue since it can lead to log divergence if we are appending from both the client and from the fetcher thread at the same time. I tested this locally and confirmed that it is possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13099: Fix Version/s: 2.6.3 2.5.2 > Message too large error when expiring transactionalIds > -- > > Key: KAFKA-13099 > URL: https://issues.apache.org/jira/browse/KAFKA-13099 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, > 2.7.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0, 2.5.2, 2.6.3, 2.7.2, 2.8.1 > > > We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing > tombstones for expired transactionalIds. This is possible because we collect > all expired IDs into a single batch. We should ensure that the created > batches are smaller than the max message size. Any expired IDs that cannot > fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13142: Affects Version/s: (was: 3.0.0) > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13148) Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13148. - Resolution: Fixed Closing this since it was fixed by KAFKA-12158. > Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE > --- > > Key: KAFKA-13148 > URL: https://issues.apache.org/jira/browse/KAFKA-13148 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Niket Goel >Priority: Major > Labels: kip-500 > > In some cases the RaftClient will return Long.MAX_VALUE: > {code:java} > /** >* Append a list of records to the log. The write will be scheduled for > some time >* in the future. There is no guarantee that appended records will be > written to >* the log and eventually committed. However, it is guaranteed that if > any of the >* records become committed, then all of them will be. >* >* If the provided current leader epoch does not match the current > epoch, which >* is possible when the state machine has yet to observe the epoch > change, then >* this method will return {@link Long#MAX_VALUE} to indicate an offset > which is >* not possible to become committed. The state machine is expected to > discard all >* uncommitted entries after observing an epoch change. >* >* @param epoch the current leader epoch >* @param records the list of records to append >* @return the expected offset of the last record; {@link > Long#MAX_VALUE} if the records could >* be committed; null if no memory could be allocated for the > batch at this time >* @throws org.apache.kafka.common.errors.RecordBatchTooLargeException > if the size of the records is greater than the maximum >* batch size; if this exception is throw none of the elements > in records were >* committed >*/ > Long scheduleAtomicAppend(int epoch, List records); > {code} > The controller doesn't handle this case: > {code:java} > // If the operation returned a batch of records, those > records need to be > // written before we can return our result to the user. > Here, we hand off > // the batch of records to the raft client. They will be > written out > // asynchronously. > final long offset; > if (result.isAtomic()) { > offset = > raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); > } else { > offset = raftClient.scheduleAppend(controllerEpoch, > result.records()); > } > op.processBatchEndOffset(offset); > writeOffset = offset; > resultAndOffset = ControllerResultAndOffset.of(offset, > result); > for (ApiMessageAndVersion message : result.records()) { > replay(message.message(), Optional.empty(), offset); > } > snapshotRegistry.getOrCreateSnapshot(offset); > log.debug("Read-write operation {} will be completed when > the log " + > "reaches offset {}.", this, resultAndOffset.offset()); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier
[ https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13155: --- Assignee: Jason Gustafson > ConcurrentModificationException in TransactionalMessageCopier > - > > Key: KAFKA-13155 > URL: https://issues.apache.org/jira/browse/KAFKA-13155 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Caught this exception in a system test run: > {code} > [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread > 'transactional-message-copier-shutdown-hook': > (org.apache.kafka.common.utils.KafkaThread) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) > at > org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) > at java.lang.Thread.run(Thread.java:748) > {code} > The pattern for closing the consumer is not safe. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier
Jason Gustafson created KAFKA-13155: --- Summary: ConcurrentModificationException in TransactionalMessageCopier Key: KAFKA-13155 URL: https://issues.apache.org/jira/browse/KAFKA-13155 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Caught this exception in a system test run: ``` [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 'transactional-message-copier-shutdown-hook': (org.apache.kafka.common.utils.KafkaThread) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) at org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) at java.lang.Thread.run(Thread.java:748) ``` The pattern for closing the consumer is not safe. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13155) ConcurrentModificationException in TransactionalMessageCopier
[ https://issues.apache.org/jira/browse/KAFKA-13155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13155: Description: Caught this exception in a system test run: {code} [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 'transactional-message-copier-shutdown-hook': (org.apache.kafka.common.utils.KafkaThread) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) at org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) at java.lang.Thread.run(Thread.java:748) {code} The pattern for closing the consumer is not safe. was: Caught this exception in a system test run: ``` [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread 'transactional-message-copier-shutdown-hook': (org.apache.kafka.common.utils.KafkaThread) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) at org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) at java.lang.Thread.run(Thread.java:748) ``` The pattern for closing the consumer is not safe. > ConcurrentModificationException in TransactionalMessageCopier > - > > Key: KAFKA-13155 > URL: https://issues.apache.org/jira/browse/KAFKA-13155 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > Caught this exception in a system test run: > {code} > [2021-08-02 07:51:54,528] ERROR Uncaught exception in thread > 'transactional-message-copier-shutdown-hook': > (org.apache.kafka.common.utils.KafkaThread) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2447) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2330) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2308) > at > org.apache.kafka.tools.TransactionalMessageCopier.lambda$main$1(TransactionalMessageCopier.java:331) > at java.lang.Thread.run(Thread.java:748) > {code} > The pattern for closing the consumer is not safe. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13095) TransactionsTest is failing in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13095. - Resolution: Cannot Reproduce I am closing this since `TransactionsTest` has not yet been converted to KRaft. There was some instability introduced when the topicId fetch changes were introduced, so I think this caused some confusion. > TransactionsTest is failing in kraft mode > - > > Key: KAFKA-13095 > URL: https://issues.apache.org/jira/browse/KAFKA-13095 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > TransactionsTest#testSendOffsetsToTransactionTimeout keeps flaking on Jenkins. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13114) Unregsiter listener during renounce when the in-memory snapshot is missing
[ https://issues.apache.org/jira/browse/KAFKA-13114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13114. - Resolution: Fixed > Unregsiter listener during renounce when the in-memory snapshot is missing > -- > > Key: KAFKA-13114 > URL: https://issues.apache.org/jira/browse/KAFKA-13114 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Need to improve the renounce logic to do the following when the last > committer offset in-memory snapshot is missing: > # Reset the snapshot registry > # Unregister the listener from the RaftClient > # Register the listener from the RaftClient -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13151) Disallow policy configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13151. - Resolution: Fixed > Disallow policy configs in KRaft > > > Key: KAFKA-13151 > URL: https://issues.apache.org/jira/browse/KAFKA-13151 > Project: Kafka > Issue Type: Task >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > alter.config.policy.class.name and create.topic.policy.class.name are > unsupported by KRaft. KRaft servers should fail startup if any of these are > configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13151) Disallow policy configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13151: Affects Version/s: (was: 3.0.0) > Disallow policy configs in KRaft > > > Key: KAFKA-13151 > URL: https://issues.apache.org/jira/browse/KAFKA-13151 > Project: Kafka > Issue Type: Task >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > alter.config.policy.class.name and create.topic.policy.class.name are > unsupported by KRaft. KRaft servers should fail startup if any of these are > configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13132: Affects Version/s: (was: 3.0.0) > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13132. - Resolution: Fixed > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13132: --- Assignee: Justine Olshan (was: Jose Armando Garcia Sancio) > Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 > -- > > Key: KAFKA-13132 > URL: https://issues.apache.org/jira/browse/KAFKA-13132 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.0.0 > > > With the change in 3.0 to how topic IDs are assigned to logs, a bug was > inadvertently introduced. Now, topic IDs will only be assigned on the load of > the log to a partition in LISR requests. This means we will only assign topic > IDs for newly created topics/partitions, on broker startup, or potentially > when a partition is reassigned. > > In the case of upgrading from an IBP before 2.8, we may have a scenario where > we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller > is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last > broker upgrading, we will elect a new controller but its LISR request will > not result in topic IDs being assigned to logs of existing topics. They will > only be assigned in the cases mentioned above. > *Keep in mind, in this scenario, topic IDs will be still be assigned in the > controller/ZK to all new and pre-existing topics and will show up in > metadata.* This means we are not ensured the same guarantees we had in 2.8. > *It is just the LISR/partition.metadata part of the code that is affected.* > > The problem is two-fold > 1. We ignore LISR requests when the partition leader epoch has not increased > (previously we assigned the ID before this check) > 2. We only assign the topic ID when we are associating the log with the > partition in replicamanager for the first time. Though in the scenario > described above, we have logs associated with partitions that need to be > upgraded. > > We should check the if the LISR request is resulting in a topic ID addition > and add logic to logs already associated to partitions in replica manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13137) KRaft Controller Metric MBean names are incorrectly quoted
[ https://issues.apache.org/jira/browse/KAFKA-13137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13137: Affects Version/s: (was: 3.0.0) > KRaft Controller Metric MBean names are incorrectly quoted > -- > > Key: KAFKA-13137 > URL: https://issues.apache.org/jira/browse/KAFKA-13137 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.8.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Blocker > Fix For: 3.0.0 > > > QuorumControllerMetrics is letting com.yammer.metrics.MetricName create the > MBean names for all of the controller metrics, and that adds quotes. We have > typically used KafkaMetricsGroup to explicitly create the MBean name, and we > do not add quotes there. The controller metric names that are in common > between the old and new controller must remain the same, but they are not. > For example, this non-KRaft MBean name: > kafka.controller:type=KafkaController,name=OfflinePartitionsCount > has morphed into this when using KRaft: > "kafka.controller":type="KafkaController",name="OfflinePartitionsCount" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
[ https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13143. - Resolution: Fixed > Disable Metadata endpoint for KRaft controller > -- > > Key: KAFKA-13143 > URL: https://issues.apache.org/jira/browse/KAFKA-13143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > The controller currently implements Metadata incompletely. Specifically, it > does not return the metadata for any topics in the cluster. This may tend to > cause confusion to users. For example, if someone used the controller > endpoint by mistake in `kafka-topics.sh --list`, then they would see no > topics in the cluster, which would be surprising. It would be better for 3.0 > to disable Metadata on the controller since we currently expect clients to > connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr
[ https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13091: --- Assignee: Jason Gustafson (was: David Arthur) > Increment HW after shrinking ISR through AlterIsr > - > > Key: KAFKA-13091 > URL: https://issues.apache.org/jira/browse/KAFKA-13091 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > After we have shrunk the ISR, we have an opportunity to advance the high > watermark. We do this currently in `maybeShrinkIsr` after the synchronous > update through ZK. For the AlterIsr path, however, we cannot rely on this > call since the request is sent asynchronously. Instead we should attempt to > advance the high watermark in the callback when the AlterIsr response returns > successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13099: Affects Version/s: 2.0.1 2.4.1 2.5.1 2.6.1 2.8.0 2.7.1 > Message too large error when expiring transactionalIds > -- > > Key: KAFKA-13099 > URL: https://issues.apache.org/jira/browse/KAFKA-13099 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, 2.7.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing > tombstones for expired transactionalIds. This is possible because we collect > all expired IDs into a single batch. We should ensure that the created > batches are smaller than the max message size. Any expired IDs that cannot > fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13099) Message too large error when expiring transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13099: Affects Version/s: 2.1.1 2.2.2 2.3.1 > Message too large error when expiring transactionalIds > -- > > Key: KAFKA-13099 > URL: https://issues.apache.org/jira/browse/KAFKA-13099 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.8.0, > 2.7.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing > tombstones for expired transactionalIds. This is possible because we collect > all expired IDs into a single batch. We should ensure that the created > batches are smaller than the max message size. Any expired IDs that cannot > fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13099) Message too large error when expiring transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13099. - Fix Version/s: 2.8.1 2.7.2 3.0.0 Resolution: Fixed > Message too large error when expiring transactionalIds > -- > > Key: KAFKA-13099 > URL: https://issues.apache.org/jira/browse/KAFKA-13099 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing > tombstones for expired transactionalIds. This is possible because we collect > all expired IDs into a single batch. We should ensure that the created > batches are smaller than the max message size. Any expired IDs that cannot > fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13146) Consider client use cases for accessing controller endpoints
[ https://issues.apache.org/jira/browse/KAFKA-13146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13146: Description: In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this for two reasons. First, the implementation did not return any topic metadata. This was confusing for users who mistakenly tried to use the controller endpoint in order to describe or list topics since it would appear that no topics existed in the cluster. The second reason is that the implementation returned the controller endpoints. So even if we returned the topic metadata, clients would be unable to access the topics for reading or writing through the controller endpoint. So for 3.0, we are effectively saying that clients should only access the broker endpoints. Long term, is that what we want? When running the controllers as separate nodes, it may be useful to initialize the controllers and cluster metadata before starting any of the brokers, for example. For this to work, we need to put some thought into how the Metadata API should work with controllers. For example, we can return a flag or some kind of error code in the response to indicate that topic metadata is not available. We have also considered whether the internal __cluster_metadata topic should be readable through the controller endpoints by consumers. was: In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this for two reasons. First, the implementation did not return any topic metadata. This was confusing for users who mistakenly tried to use the controller endpoint in order to describe or list topics since it would appear that no topics existed in the cluster. The second reason is that the implementation returned the controller endpoints. So even if we returned the topic metadata, clients would be unable to access the topics for reading or writing through the controller endpoint. So for 3.0, we are effectively saying that clients should only access the broker endpoints. Long term, is that what we want? When running the controllers as separate nodes, it may be useful to initialize the controllers and cluster metadata before starting any of the brokers, for example. For this to work, we need to put some thought into how the Metadata API should work with controllers. For example, we can return a flag or some kind of error code in the response to indicate that topic metadata is not available. We have also considered whether the internal __cluster_metadata topic should be readable through the controller endpoints. > Consider client use cases for accessing controller endpoints > > > Key: KAFKA-13146 > URL: https://issues.apache.org/jira/browse/KAFKA-13146 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: kip-500 > > In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this > for two reasons. First, the implementation did not return any topic metadata. > This was confusing for users who mistakenly tried to use the controller > endpoint in order to describe or list topics since it would appear that no > topics existed in the cluster. The second reason is that the implementation > returned the controller endpoints. So even if we returned the topic metadata, > clients would be unable to access the topics for reading or writing through > the controller endpoint. > So for 3.0, we are effectively saying that clients should only access the > broker endpoints. Long term, is that what we want? When running the > controllers as separate nodes, it may be useful to initialize the controllers > and cluster metadata before starting any of the brokers, for example. For > this to work, we need to put some thought into how the Metadata API should > work with controllers. For example, we can return a flag or some kind of > error code in the response to indicate that topic metadata is not available. > We have also considered whether the internal __cluster_metadata topic should > be readable through the controller endpoints by consumers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13146) Consider client use cases for accessing controller endpoints
Jason Gustafson created KAFKA-13146: --- Summary: Consider client use cases for accessing controller endpoints Key: KAFKA-13146 URL: https://issues.apache.org/jira/browse/KAFKA-13146 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this for two reasons. First, the implementation did not return any topic metadata. This was confusing for users who mistakenly tried to use the controller endpoint in order to describe or list topics since it would appear that no topics existed in the cluster. The second reason is that the implementation returned the controller endpoints. So even if we returned the topic metadata, clients would be unable to access the topics for reading or writing through the controller endpoint. So for 3.0, we are effectively saying that clients should only access the broker endpoints. Long term, is that what we want? When running the controllers as separate nodes, it may be useful to initialize the controllers and cluster metadata before starting any of the brokers, for example. For this to work, we need to put some thought into how the Metadata API should work with controllers. For example, we can return a flag or some kind of error code in the response to indicate that topic metadata is not available. We have also considered whether the internal __cluster_metadata topic should be readable through the controller endpoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
[ https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12851. - Resolution: Fixed > Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable > --- > > Key: KAFKA-12851 > URL: https://issues.apache.org/jira/browse/KAFKA-12851 > Project: Kafka > Issue Type: Bug > Components: core, kraft >Reporter: A. Sophie Blee-Goldman >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > Attachments: Capture.PNG > > > Failed twice on a [PR > build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/] > h3. Stacktrace > org.opentest4j.AssertionFailedError: expected: but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present
[ https://issues.apache.org/jira/browse/KAFKA-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13141: --- Assignee: Rajini Sivaram (was: Jason Gustafson) > Leader should not update follower fetch offset if diverging epoch is present > > > Key: KAFKA-13141 > URL: https://issues.apache.org/jira/browse/KAFKA-13141 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 2.7.1 >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol > instead of using the old OffsetsForLeaderEpoch API. When truncation is > detected, we return a `divergingEpoch` field in the Fetch response, but we do > not set an error code. The sender is expected to check if the diverging epoch > is present and truncate accordingly. > All of this works correctly in the fetcher implementation, but the problem is > that the logic to update the follower fetch position on the leader does not > take into account the diverging epoch present in the response. This means the > fetch offsets can be updated incorrectly, which can lead to either log > divergence or the loss of committed data. > For example, we hit the following case with 3 replicas. Leader 1 is elected > in epoch 1 with an end offset of 100. The followers are at offset 101 > Broker 1: (Leader) Epoch 1 from offset 100 > Broker 2: (Follower) Epoch 1 from offset 101 > Broker 3: (Follower) Epoch 1 from offset 101 > Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the > divergence and returns a diverging epoch in the fetch state. Nevertheless, > the fetch positions for both followers are updated to 101 and the high > watermark is advanced. > After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a > network partition of some kind and was kicked from the ISR. This caused > broker 2 to get elected, which resulted in the following state at the start > of epoch 2. > Broker 1: (Follower) Epoch 2 from offset 101 > Broker 2: (Leader) Epoch 2 from offset 100 > Broker 3: (Follower) Epoch 2 from offset 100 > Broker 2 was then able to write a new entry at offset 100 and the old record > which may have been exposed to consumers was deleted by broker 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
[ https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13143: --- Assignee: Niket Goel (was: Jose Armando Garcia Sancio) > Disable Metadata endpoint for KRaft controller > -- > > Key: KAFKA-13143 > URL: https://issues.apache.org/jira/browse/KAFKA-13143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > The controller currently implements Metadata incompletely. Specifically, it > does not return the metadata for any topics in the cluster. This may tend to > cause confusion to users. For example, if someone used the controller > endpoint by mistake in `kafka-topics.sh --list`, then they would see no > topics in the cluster, which would be surprising. It would be better for 3.0 > to disable Metadata on the controller since we currently expect clients to > connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
[ https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13143: Issue Type: Bug (was: Improvement) > Disable Metadata endpoint for KRaft controller > -- > > Key: KAFKA-13143 > URL: https://issues.apache.org/jira/browse/KAFKA-13143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.0.0 > > > The controller currently implements Metadata incompletely. Specifically, it > does not return the metadata for any topics in the cluster. This may tend to > cause confusion to users. For example, if someone used the controller > endpoint by mistake in `kafka-topics.sh --list`, then they would see no > topics in the cluster, which would be surprising. It would be better for 3.0 > to disable Metadata on the controller since we currently expect clients to > connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13143) Disable Metadata endpoint for KRaft controller
Jason Gustafson created KAFKA-13143: --- Summary: Disable Metadata endpoint for KRaft controller Key: KAFKA-13143 URL: https://issues.apache.org/jira/browse/KAFKA-13143 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 The controller currently implements Metadata incompletely. Specifically, it does not return the metadata for any topics in the cluster. This may tend to cause confusion to users. For example, if someone used the controller endpoint by mistake in `kafka-topics.sh --list`, then they would see no topics in the cluster, which would be surprising. It would be better for 3.0 to disable Metadata on the controller since we currently expect clients to connect through brokers anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present
Jason Gustafson created KAFKA-13141: --- Summary: Leader should not update follower fetch offset if diverging epoch is present Key: KAFKA-13141 URL: https://issues.apache.org/jira/browse/KAFKA-13141 Project: Kafka Issue Type: Bug Affects Versions: 2.7.1, 2.8.0 Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0, 2.7.2, 2.8.1 In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol instead of using the old OffsetsForLeaderEpoch API. When truncation is detected, we return a `divergingEpoch` field in the Fetch response, but we do not set an error code. The sender is expected to check if the diverging epoch is present and truncate accordingly. All of this works correctly in the fetcher implementation, but the problem is that the logic to update the follower fetch position on the leader does not take into account the diverging epoch present in the response. This means the fetch offsets can be updated incorrectly, which can lead to either log divergence or the loss of committed data. For example, we hit the following case with 3 replicas. Leader 1 is elected in epoch 1 with an end offset of 100. The followers are at offset 101 Broker 1: (Leader) Epoch 1 from offset 100 Broker 2: (Follower) Epoch 1 from offset 101 Broker 3: (Follower) Epoch 1 from offset 101 Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the divergence and returns a diverging epoch in the fetch state. Nevertheless, the fetch positions for both followers are updated to 101 and the high watermark is advanced. After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a network partition of some kind and was kicked from the ISR. This caused broker 2 to get elected, which resulted in the following state at the start of epoch 2. Broker 1: (Follower) Epoch 2 from offset 101 Broker 2: (Leader) Epoch 2 from offset 100 Broker 3: (Follower) Epoch 2 from offset 100 Broker 2 was then able to write a new entry at offset 100 and the old record which may have been exposed to consumers was deleted by broker 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13095) TransactionsTest is failing in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13095: --- Assignee: Jason Gustafson (was: David Arthur) > TransactionsTest is failing in kraft mode > - > > Key: KAFKA-13095 > URL: https://issues.apache.org/jira/browse/KAFKA-13095 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13113) Add unregister support to the RaftClient.
[ https://issues.apache.org/jira/browse/KAFKA-13113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13113. - Resolution: Fixed > Add unregister support to the RaftClient. > - > > Key: KAFKA-13113 > URL: https://issues.apache.org/jira/browse/KAFKA-13113 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Implement the following API: > {code:java} > interface RaftClient { > ListenerContext register(Listener); > void unregister(ListenerContext); > } > interface ListenerContext { > } > interface Listener { > void handleCommit(ListenerContext, BatchReader); > void handleSnapshot(ListenerContext, SnapshotReader); > void handleLeaderChange(ListenerContext, LeaderAndEpoch); > } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8529. Resolution: Fixed > Flakey test ConsumerBounceTest#testCloseDuringRebalance > --- > > Key: KAFKA-8529 > URL: https://issues.apache.org/jira/browse/KAFKA-8529 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull] > > *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22* > *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in > time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* > at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at > kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22* > at > kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22* > at > kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13127) Fix stray partition lookup logic
[ https://issues.apache.org/jira/browse/KAFKA-13127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13127: Description: The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all of these partitions to get deleted on startup by mistake. (was: The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all off these partitions to get deleted on startup by mistake.) > Fix stray partition lookup logic > > > Key: KAFKA-13127 > URL: https://issues.apache.org/jira/browse/KAFKA-13127 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It > returns all of the non-stray replicas. This causes all of these partitions to > get deleted on startup by mistake. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13127) Fix stray partition lookup logic
Jason Gustafson created KAFKA-13127: --- Summary: Fix stray partition lookup logic Key: KAFKA-13127 URL: https://issues.apache.org/jira/browse/KAFKA-13127 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.0.0 The result of `BrokerMetadataPublisher.findGhostReplicas` is inverted. It returns all of the non-stray replicas. This causes all off these partitions to get deleted on startup by mistake. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13111) Re-evaluate Fetch Sessions when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-13111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384526#comment-17384526 ] Jason Gustafson commented on KAFKA-13111: - Thanks, this makes sense to me. At a high level, we want to keep the protocol simple even if it costs a little more complexity in the implementation. I think it is indeed simpler if topic IDs are handled similarly to topic names. When a topic ID is unknown, we nevertheless store it in the session and keep sending the client UNKNOWN_TOPIC_ID until it gets removed through the forgotten topic list or the broker learns the topic name mapping from the controller. > Re-evaluate Fetch Sessions when using topic IDs > --- > > Key: KAFKA-13111 > URL: https://issues.apache.org/jira/browse/KAFKA-13111 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.1.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > For fetch request version 13 we have the current method of handling unknown > topic IDs. > * When the receiving broker sees an unknown topic ID in a request or > encounters an inconsistent (mismatched) ID in the logs, it sends a top-level > error back, delays *all* partitions (in fetcher thread), and closes the > session > Ideally, we want to handle the same way as unknown topic names. We hold the > topic partition in the session and try to resolve on a future fetch request. > However, there are a few complications with this approach and this is why we > opted to simply close the session. One is that many objects in the fetch > session are keyed using topic name+partition. We also still need the topic > name for a few other checks in the fetch path. > Still, upon further inspection, closing the session on any new topics (when > using topic names, we often see a few unknown topic or partition exceptions) > is not ideal. > One way to see similar behavior in the topic ID case is to store unresolved > IDs in the session. For compatibility reasons, we will need to add and/or > generify a few classes. The general idea is that for sessions using version > 13+ we will use a structure that keys using topic ID and partition with an > optional name. That way, we can send partition level errors for unknown topic > IDs and handle them later in the same session. We can also remove partitions > using topic ID more easily if the unknown topic ID was due to stale metadata. > Then the new method will be as follows: > * When the receiving broker sees an unknown topic ID in a request or > encounters an inconsistent (mismatched) ID in the logs, it sends an error on > the unknown partition, delay *only those* partitions (in fetcher thread), and > keep the session open to try to resolve in the future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383650#comment-17383650 ] Jason Gustafson commented on KAFKA-12644: - Downgrading priority since this is not a blocker. We can nevertheless aim for 3.0. > Add Missing Class-Level Javadoc to Descendants of > org.apache.kafka.common.errors.ApiException > - > > Key: KAFKA-12644 > URL: https://issues.apache.org/jira/browse/KAFKA-12644 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Affects Versions: 3.0.0, 2.8.1 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Blocker > Labels: documentation > Fix For: 3.0.0, 2.8.1 > > > I noticed that class-level Javadocs are missing from some classes in the > org.apache.kafka.common.errors package. This issue is for tracking the work > of adding the missing class-level javadocs for those Exception classes. > https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html > https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors > Basic class-level documentation could be derived by mapping the error > conditions documented in the protocol > https://kafka.apache.org/protocol#protocol_constants -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12644: Priority: Major (was: Blocker) > Add Missing Class-Level Javadoc to Descendants of > org.apache.kafka.common.errors.ApiException > - > > Key: KAFKA-12644 > URL: https://issues.apache.org/jira/browse/KAFKA-12644 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Affects Versions: 3.0.0, 2.8.1 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > Labels: documentation > Fix For: 3.0.0, 2.8.1 > > > I noticed that class-level Javadocs are missing from some classes in the > org.apache.kafka.common.errors package. This issue is for tracking the work > of adding the missing class-level javadocs for those Exception classes. > https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html > https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors > Basic class-level documentation could be derived by mapping the error > conditions documented in the protocol > https://kafka.apache.org/protocol#protocol_constants -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13099) Message too large error when expiring transactionalIds
Jason Gustafson created KAFKA-13099: --- Summary: Message too large error when expiring transactionalIds Key: KAFKA-13099 URL: https://issues.apache.org/jira/browse/KAFKA-13099 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We have seen a couple reports of MESSAGE_TOO_LARGE errors when writing tombstones for expired transactionalIds. This is possible because we collect all expired IDs into a single batch. We should ensure that the created batches are smaller than the max message size. Any expired IDs that cannot fit can be expired later. -- This message was sent by Atlassian Jira (v8.3.4#803005)