[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r825494849 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig, .groupBy { case (tp, _) => tp.topic } // Group by topic .foreach { case (topic, partitions) => // Add each topic part to the response - val topicResp = new AlterIsrResponseData.TopicData() + val topicResp = new AlterPartitionResponseData.TopicData() .setName(topic) .setPartitions(new util.ArrayList()) resp.topics.add(topicResp) partitions.foreach { case (tp, errorOrIsr) => // Add each partition part to the response (new ISR or error) errorOrIsr match { case Left(error) => topicResp.partitions.add( -new AlterIsrResponseData.PartitionData() +new AlterPartitionResponseData.PartitionData() .setPartitionIndex(tp.partition) .setErrorCode(error.code)) case Right(leaderAndIsr) => topicResp.partitions.add( -new AlterIsrResponseData.PartitionData() +new AlterPartitionResponseData.PartitionData() .setPartitionIndex(tp.partition) .setLeaderId(leaderAndIsr.leader) .setLeaderEpoch(leaderAndIsr.leaderEpoch) .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) - .setCurrentIsrVersion(leaderAndIsr.zkVersion)) + .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value) Review comment: Based on this discussion https://github.com/apache/kafka/pull/11733#issuecomment-1051066928, I decided to mark this field as ignorable in the response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r825493909 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -147,56 +147,65 @@ sealed trait IsrState { def maximalIsr: Set[Int] /** - * Indicates if we have an AlterIsr request inflight. + * The leader recovery state. See the description for LeaderRecoveryState for details on the different values. + */ + def leaderRecoveryState: LeaderRecoveryState + + /** + * Indicates if we have an AlterPartition request inflight. */ def isInflight: Boolean } -sealed trait PendingIsrChange extends IsrState { +sealed trait PendingPartitionChange extends PartitionState { def sentLeaderAndIsr: LeaderAndIsr + + override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED Review comment: I added `leaderRecoveryState` to the `toString` implementation. Regarding your other comment, I'll address it in the next PR when I implement https://issues.apache.org/jira/browse/KAFKA-13696 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814964024 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent { override def preempt(): Unit = {} } -case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], -callback: AlterIsrCallback) extends ControllerEvent { +case class AlterPartitionReceived( + brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback Review comment: Fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814963257 ## File path: core/src/main/scala/kafka/controller/Election.scala ## @@ -53,17 +60,17 @@ object Election { * Elect leaders for new or offline partitions. * * @param controllerContext Context with the current state of the cluster - * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions + * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of tuples representing the partitions Review comment: Yes. Fixed. I think I did a search and replace at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814961954 ## File path: core/src/main/scala/kafka/controller/Election.scala ## @@ -40,7 +40,14 @@ object Election { val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) - leaderAndIsr.newLeaderAndIsr(leader, newIsr) + + if (!isr.contains(leader)) { +// The new leader is not in the old ISR so mark the partition a RECOVERING +leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr) + } else { +// Elect a new leader but keep the previous leader recovery state Review comment: Yes. The case that I had in mind is: 1. Leader is elected using unclean leader election. E.g. leader: 1, recoveryState: RECOVERING 2. Leader never sends AlterPartition and goes offline. E.g. leader: -1, recoveryState: RECOVERING 3. Only ISR member (id 1) comes back online. E.g. leader:1, recoveryState: RECOVERING -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814424904 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1080,6 +1092,9 @@ class Partition(val topicPartition: TopicPartition, // decide whether to only fetch from leader val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) +// Check that the partition leader is recovering from an unclean leader election. +validateLeaderRecoveryState() Review comment: I didn't implement this now because only the leader can accept PRODUCE requests and the leader recovers immediately. In this implementation only the follower needs to wait for the leader to recover so it can accept FETCH requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814423091 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition, assignment = partitionState.replicas.asScala.map(_.toInt), isr = isr, addingReplicas = addingReplicas, -removingReplicas = removingReplicas +removingReplicas = removingReplicas, +LeaderRecoveryState.RECOVERED Review comment: I'll log a message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814422959 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition, assignment = partitionState.replicas.asScala.map(_.toInt), isr = isr, addingReplicas = addingReplicas, -removingReplicas = removingReplicas +removingReplicas = removingReplicas, +LeaderRecoveryState.RECOVERED Review comment: Yes and no. Unclean leader election is not possible when there is only one replica since the ISR set always equals the replica set. That means that if the replica is online then it will be elected as part of regular election and if it is offline the unclean leader election is a noop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814407849 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() { } if (change.leader() != request.brokerId() && change.leader() != NO_LEADER_CHANGE) { -// Normally, an alterIsr request, which is made by the partition +// Normally, an alterPartition request, which is made by the partition // leader itself, is not allowed to modify the partition leader. // However, if there is an ongoing partition reassignment and the // ISR change completes it, then the leader may change as part of // the changes made during reassignment cleanup. // // In this case, we report back FENCED_LEADER_EPOCH to the leader -// which made the alterIsr request. This lets it know that it must +// which made the alterPartition request. This lets it know that it must // fetch new metadata before trying again. This return code is // unusual because we both return an error and generate a new // metadata record. We usually only do one or the other. -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment and triggered a " + -"leadership change. Reutrning FENCED_LEADER_EPOCH.", +"leadership change. Returning FENCED_LEADER_EPOCH.", request.brokerId(), topic.name, partitionId); -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(FENCED_LEADER_EPOCH.code())); continue; } else if (change.removingReplicas() != null || change.addingReplicas() != null) { -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment.", request.brokerId(), topic.name, partitionId); } } -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(result.code()). setLeaderId(partition.leader). +setIsr(Replicas.toList(partition.isr)). + setLeaderRecoveryState(partition.leaderRecoveryState.value()). Review comment: Yes. You are correct, The `LeaderRecoveryState` field for `AlterPartitionResponse` is not a tagged field. The argument for why this is safe is the same as for `KafkaController`. In the success case the `LeaderRecoveryState` in the response is the same as the request. For version 0, this is the default, `RECOVERED`. If the AlterPartition request fails then both controller implementation set the `ErrorCode` field in the response and leave the rest of the fields in their default value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814089014 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ## @@ -110,6 +110,14 @@ LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); +/** + * ALTER_ISR was the old name for ALTER_PARTITION. + * + * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead + */ +@Deprecated Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813999251 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() { } if (change.leader() != request.brokerId() && change.leader() != NO_LEADER_CHANGE) { -// Normally, an alterIsr request, which is made by the partition +// Normally, an alterPartition request, which is made by the partition // leader itself, is not allowed to modify the partition leader. // However, if there is an ongoing partition reassignment and the // ISR change completes it, then the leader may change as part of // the changes made during reassignment cleanup. // // In this case, we report back FENCED_LEADER_EPOCH to the leader -// which made the alterIsr request. This lets it know that it must +// which made the alterPartition request. This lets it know that it must // fetch new metadata before trying again. This return code is // unusual because we both return an error and generate a new // metadata record. We usually only do one or the other. -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment and triggered a " + -"leadership change. Reutrning FENCED_LEADER_EPOCH.", +"leadership change. Returning FENCED_LEADER_EPOCH.", request.brokerId(), topic.name, partitionId); -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(FENCED_LEADER_EPOCH.code())); continue; } else if (change.removingReplicas() != null || change.addingReplicas() != null) { -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment.", request.brokerId(), topic.name, partitionId); } } -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(result.code()). setLeaderId(partition.leader). +setIsr(Replicas.toList(partition.isr)). + setLeaderRecoveryState(partition.leaderRecoveryState.value()). setLeaderEpoch(partition.leaderEpoch). -setCurrentIsrVersion(partition.partitionEpoch). -setIsr(Replicas.toList(partition.isr))); +setPartitionEpoch(partition.partitionEpoch)); } } + return ControllerResult.of(records, response); } +/** + * Validate the partition information included in the alter partition request. + * + * @param brokerId id of the broker requesting the alter partition + * @param topic current topic information store by the replication manager + * @param partitionId partition id being altered + * @param partition current partition registration for the partition being altered + * @param partitionData partition data from the alter partition request + * + * @return Errors.NONE for valid alter partition data; otherwise the validation error + */ +private Errors validateAlterPartitionData( +int brokerId, +TopicControlInfo topic, +int partitionId, +PartitionRegistration partition, +AlterPartitionRequestData.PartitionData partitionData +) { +if (partition == null) { +log.info("Rejecting alterPartition request for unknown partition {}-{}.", Review comment: Fixed throughout the file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813998423 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala ## @@ -2015,6 +2067,8 @@ class PartitionTest extends AbstractPartitionTest { verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic()) } + // TODO: Add a test that shows that the follower rejects reads until a recovered leader and isr Review comment: Done. Added this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813998749 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -104,6 +107,14 @@ public PartitionChangeBuilder setTargetAdding(List targetAdding) { return this; } +public PartitionChangeBuilder setTargetLeaderRecoveryState(LeaderRecoveryState targetLeaderRecoveryState) { +this.targetLeaderRecoveryState = targetLeaderRecoveryState; +return this; +} + +// TODO: We need to make sure that the LeaderRecoveryState is not lost when the partition transitions from Review comment: Added two tests: 1. Show that going from online to offline to online preserves the leader recovery state 2. Show that performing an unclean leader election set the leader recovery state to RECOVERING -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813998423 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala ## @@ -2015,6 +2067,8 @@ class PartitionTest extends AbstractPartitionTest { verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic()) } + // TODO: Add a test that shows that the follower rejects reads until a recovered leader and isr Review comment: Added two tests: 1. Show that going from online to offline to online preserves the leader recovery state 2. Show that performing an unclean leader election set the leader recovery state to RECOVERING -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813997211 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala ## @@ -1077,6 +1078,58 @@ class PartitionTest extends AbstractPartitionTest { } + @Test + def testInvalidAlterPartitionAreNotRetried(): Unit = { +val log = logManager.getOrCreateLog(topicPartition, topicId = None) +seedLogData(log, numRecords = 10, leaderEpoch = 4) + +val controllerEpoch = 0 +val leaderEpoch = 5 +val remoteBrokerId = brokerId + 1 +val replicas = List[Integer](brokerId, remoteBrokerId).asJava +val isr = List[Integer](brokerId).asJava + +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) +assertTrue(partition.makeLeader( +new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(brokerId) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true), +offsetCheckpoints, None), "Expected become leader transition to succeed") +assertEquals(Set(brokerId), partition.partitionState.isr) + +val remoteReplica = partition.getReplica(remoteBrokerId).get +assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset) +assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset) + +partition.updateFollowerFetchState(remoteBrokerId, + followerFetchOffsetMetadata = LogOffsetMetadata(10), + followerStartOffset = 0L, + followerFetchTimeMs = time.milliseconds(), + leaderEndOffset = 10L) + +// Check that the isr didn't change and alter update is scheduled Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813996872 ## File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala ## @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.zk + +import TopicPartitionStateZNode.decode +import TopicPartitionStateZNode.encode +import kafka.api.LeaderAndIsr +import kafka.controller.LeaderIsrAndControllerEpoch +import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.zookeeper.data.Stat +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock +import org.mockito.Mockito.when + +final class TopicPartitionStateZNodeTest { + + @Test + def testEncodeDecodeRecovering(): Unit = { +val zkVersion = 5 +val stat = mock(classOf[Stat]) +when(stat.getVersion).thenReturn(zkVersion) + +val expected = LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 6, List(1), LeaderRecoveryState.RECOVERING, zkVersion), 10) + +assertEquals(Some(expected), decode(encode(expected), stat)) + } + + @Test + def testEncodeDecodeRecovered(): Unit = { Review comment: We were covering it implicitly. Added another test case that shows the behavior explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813995003 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() { } if (change.leader() != request.brokerId() && change.leader() != NO_LEADER_CHANGE) { -// Normally, an alterIsr request, which is made by the partition +// Normally, an alterPartition request, which is made by the partition // leader itself, is not allowed to modify the partition leader. // However, if there is an ongoing partition reassignment and the // ISR change completes it, then the leader may change as part of // the changes made during reassignment cleanup. // // In this case, we report back FENCED_LEADER_EPOCH to the leader -// which made the alterIsr request. This lets it know that it must +// which made the alterPartition request. This lets it know that it must // fetch new metadata before trying again. This return code is // unusual because we both return an error and generate a new // metadata record. We usually only do one or the other. -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment and triggered a " + -"leadership change. Reutrning FENCED_LEADER_EPOCH.", +"leadership change. Returning FENCED_LEADER_EPOCH.", request.brokerId(), topic.name, partitionId); -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(FENCED_LEADER_EPOCH.code())); continue; } else if (change.removingReplicas() != null || change.addingReplicas() != null) { -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment.", request.brokerId(), topic.name, partitionId); } } -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(result.code()). setLeaderId(partition.leader). +setIsr(Replicas.toList(partition.isr)). + setLeaderRecoveryState(partition.leaderRecoveryState.value()). Review comment: It is a tagged field so we can't. I think this is okay for the following reason: 1. If the leader doesn't support this feature it will send version 0 of the AlterPartition request which will set the leader recovery state to RECOVERED 2. The leader only change value if: a. a AlterPartition request is sent that increases the ISR b. another unclean leader election is performed which sets (keeps) the leader recovery state as RECOVERING Note that the broker to controller channel uses the ApiVersions response to determine which version of AlterPartition to send. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r813988441 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() { } if (change.leader() != request.brokerId() && change.leader() != NO_LEADER_CHANGE) { -// Normally, an alterIsr request, which is made by the partition +// Normally, an alterPartition request, which is made by the partition // leader itself, is not allowed to modify the partition leader. // However, if there is an ongoing partition reassignment and the // ISR change completes it, then the leader may change as part of // the changes made during reassignment cleanup. // // In this case, we report back FENCED_LEADER_EPOCH to the leader -// which made the alterIsr request. This lets it know that it must +// which made the alterPartition request. This lets it know that it must // fetch new metadata before trying again. This return code is // unusual because we both return an error and generate a new // metadata record. We usually only do one or the other. -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment and triggered a " + -"leadership change. Reutrning FENCED_LEADER_EPOCH.", +"leadership change. Returning FENCED_LEADER_EPOCH.", request.brokerId(), topic.name, partitionId); -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(FENCED_LEADER_EPOCH.code())); continue; } else if (change.removingReplicas() != null || change.addingReplicas() != null) { -log.info("AlterIsr request from node {} for {}-{} completed " + +log.info("AlterPartition request from node {} for {}-{} completed " + "the ongoing partition reassignment.", request.brokerId(), topic.name, partitionId); } } -responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + +responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). setPartitionIndex(partitionId). setErrorCode(result.code()). setLeaderId(partition.leader). +setIsr(Replicas.toList(partition.isr)). + setLeaderRecoveryState(partition.leaderRecoveryState.value()). setLeaderEpoch(partition.leaderEpoch). -setCurrentIsrVersion(partition.partitionEpoch). -setIsr(Replicas.toList(partition.isr))); +setPartitionEpoch(partition.partitionEpoch)); } } + return ControllerResult.of(records, response); } +/** + * Validate the partition information included in the alter partition request. + * + * @param brokerId id of the broker requesting the alter partition + * @param topic current topic information store by the replication manager + * @param partitionId partition id being altered + * @param partition current partition registration for the partition being altered + * @param partitionData partition data from the alter partition request + * + * @return Errors.NONE for valid alter partition data; otherwise the validation error + */ +private Errors validateAlterPartitionData( +int brokerId, +TopicControlInfo topic, +int partitionId, +PartitionRegistration partition, +AlterPartitionRequestData.PartitionData partitionData +) { +if (partition == null) { +log.info("Rejecting alterPartition request for unknown partition {}-{}.", +topic.name, partitionId); + +return UNKNOWN_TOPIC_OR_PARTITION; +} +if (partitionData.leaderEpoch() != partition.leaderEpoch) { Review comment: Yeah. Maybe a INVALID_REQUEST is
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812253132 ## File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala ## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.zk + +import TopicPartitionStateZNode.decode +import TopicPartitionStateZNode.encode +import kafka.api.LeaderAndIsr +import kafka.controller.LeaderIsrAndControllerEpoch +import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.zookeeper.data.Stat +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock +import org.mockito.Mockito.when + +final class TopicPartitionStateZNodeTest { + + @Test + def testEncodeDecode(): Unit = { Review comment: Yes. Added another test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812228372 ## File path: checkstyle/suppressions.xml ## @@ -276,6 +276,8 @@ files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/> +
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812227886 ## File path: clients/src/main/resources/common/message/AlterPartitionRequest.json ## @@ -34,9 +34,11 @@ { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of this partition" }, { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId", - "about": "The ISR for this partition"}, -{ "name": "CurrentIsrVersion", "type": "int32", "versions": "0+", - "about": "The expected version of ISR which is being updated"} + "about": "The ISR for this partition" }, +{ "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", + "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." }, +{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", + "about": "The expected epoch of the partition which is being updated" } Review comment: Fixed both the request and the response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812227573 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1305,33 +1305,33 @@ private DescribeUserScramCredentialsResponse createDescribeUserScramCredentialsR return new DescribeUserScramCredentialsResponse(data); } -private AlterIsrRequest createAlterIsrRequest(short version) { -AlterIsrRequestData data = new AlterIsrRequestData() +private AlterPartitionRequest createAlterPartitionRequest(short version) { Review comment: Updated the test to take into count the version passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812227025 ## File path: metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public enum LeaderRecoveryState { +/** + * Represent that the election for the partition was either an ISR election or the + * leader recovered from an unclean leader election. + */ +RECOVERED((byte) 0), + +/** + * Represent that the election for the partition was an unclean leader election and + * that the leader is recovering from it. + */ +RECOVERING((byte) 1); + +/** + * A special value used to represent that the LeaderRecoveryState field of a + * PartitionChangeRecord didn't change. + */ +private static final byte NO_CHANGE = (byte) -1; + +private static final Map VALUE_TO_ENUM; Review comment: Removed the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812101522 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -250,23 +260,35 @@ class DefaultAlterIsrManager( val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]() data.topics.forEach { topic => - topic.partitions().forEach(partition => { + topic.partitions().forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) -val error = Errors.forCode(partition.errorCode()) +val apiError = Errors.forCode(partition.errorCode()) debug(s"Controller successfully handled AlterIsr request for $tp: $partition") -if (error == Errors.NONE) { - val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch, -partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion) - partitionResponses(tp) = Right(newLeaderAndIsr) +if (apiError == Errors.NONE) { + try { +partitionResponses(tp) = Right( + LeaderAndIsr( +partition.leaderId, +partition.leaderEpoch, +partition.isr.asScala.toList.map(_.toInt), +LeaderRecoveryState.of(partition.leaderRecoveryState), +partition.partitionEpoch + ) +) + } catch { +case e: IllegalArgumentException => Review comment: Okay. We have both: `of` which throws and `optionalOf` which returns an `Optional`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r812044247 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig, .groupBy { case (tp, _) => tp.topic } // Group by topic .foreach { case (topic, partitions) => // Add each topic part to the response - val topicResp = new AlterIsrResponseData.TopicData() + val topicResp = new AlterPartitionResponseData.TopicData() .setName(topic) .setPartitions(new util.ArrayList()) resp.topics.add(topicResp) partitions.foreach { case (tp, errorOrIsr) => // Add each partition part to the response (new ISR or error) errorOrIsr match { case Left(error) => topicResp.partitions.add( -new AlterIsrResponseData.PartitionData() +new AlterPartitionResponseData.PartitionData() .setPartitionIndex(tp.partition) .setErrorCode(error.code)) case Right(leaderAndIsr) => topicResp.partitions.add( -new AlterIsrResponseData.PartitionData() +new AlterPartitionResponseData.PartitionData() .setPartitionIndex(tp.partition) .setLeaderId(leaderAndIsr.leader) .setLeaderEpoch(leaderAndIsr.leaderEpoch) .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) - .setCurrentIsrVersion(leaderAndIsr.zkVersion)) + .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value) Review comment: It maybe a little subtle but I don't think we need to. If the version is `0` then this is guaranteed to be the default value `0` so the serialization will succeed. This is true because we only write these values in the response when the operation success. If the operation fails then we skip writing these values and instead just write the error code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r805920416 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -250,23 +260,35 @@ class DefaultAlterIsrManager( val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]() data.topics.forEach { topic => - topic.partitions().forEach(partition => { + topic.partitions().forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) -val error = Errors.forCode(partition.errorCode()) +val apiError = Errors.forCode(partition.errorCode()) debug(s"Controller successfully handled AlterIsr request for $tp: $partition") -if (error == Errors.NONE) { - val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch, -partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion) - partitionResponses(tp) = Right(newLeaderAndIsr) +if (apiError == Errors.NONE) { + try { +partitionResponses(tp) = Right( + LeaderAndIsr( +partition.leaderId, +partition.leaderEpoch, +partition.isr.asScala.toList.map(_.toInt), +LeaderRecoveryState.of(partition.leaderRecoveryState), +partition.partitionEpoch + ) +) + } catch { +case e: IllegalArgumentException => Review comment: Yeah. I went back and forth of this. There are some code paths were we want to throw an exception as in most cases this means a bug in Kafka. I think this was the only case where we want to handle this exception. I'll take a look again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r805916080 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ## @@ -110,6 +110,14 @@ LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); +/** + * ALTER_ISR was the old name for ALTER_PARTITION. + * + * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead + */ +@Deprecated Review comment: Okay. I wasn't sure if the symbols in `common/protocol/ApiKeys.java` were public. I am okay keeping it as it is a light way of keeping backward compatibility just in case external code is using this `enum`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org