dajac commented on code in PR #12329:
URL: https://github.com/apache/kafka/pull/12329#discussion_r903337001


##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -163,7 +162,7 @@ class DefaultAlterPartitionManager(
     if (enqueued) {
       maybePropagateIsrChanges()
     } else {
-      future.completeExceptionally(new OperationNotAttemptedException(
+      future.completeExceptionally(new IllegalStateException(

Review Comment:
   We use `OperationNotAttemptedException` on purpose here. The issue with 
`IllegalStateException` is that you end up in a tight retry loop.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {

Review Comment:
   nit: `AlterPartitionRequest`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {

Review Comment:
   nit: `.thenAnswer { invocation =>`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,

Review Comment:
   nit: Could we make it private?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {

Review Comment:
   nit: Could we format this as follow?
   ```
   def createClientResponseWithAlterPartitionResponse(
     topicPartition: TopicPartition,
     partitionErrorCode: Short,
     isr: List[Int] = List.empty,
     leaderEpoch: Int = 0,
     partitionEpoch: Int = 0
   ): ClientResponse = {
   ```



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -359,14 +358,11 @@ class DefaultAlterPartitionManager(
         inflightAlterPartitionItems.foreach { inflightAlterPartition =>
           partitionResponses.get(inflightAlterPartition.topicIdPartition) 
match {
             case Some(leaderAndIsrOrError) =>
-              try {
-                leaderAndIsrOrError match {
-                  case Left(error) => 
inflightAlterPartition.future.completeExceptionally(error.exception)
-                  case Right(leaderAndIsr) => 
inflightAlterPartition.future.complete(leaderAndIsr)
-                }
-              } finally {
-                // Regardless of callback outcome, we need to clear from the 
unsent updates map to unblock further updates
-                
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
+              // we need to clear from the unsent updates map to unblock 
further updates or retries

Review Comment:
   nit: Should we say something like this?
   ```
   // Regardless of callback outcome, we need to clear from the unsent updates 
map to unblock further
   // updates. We clear it now to allow the callback to submit a new update if 
needed.
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })

Review Comment:
   nit: Should this one be aligned on `.thenAnswer`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error

Review Comment:
   nit: `Complete the ISR expansion`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())

Review Comment:
   nit: There are a few places where we could omit the parenthesis when calling 
getters.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })
+      .thenAnswer(invocation => {
+      val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+      
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+    })
+
+    assertTrue(makeLeader(
+      topicId = None,
+      controllerEpoch,
+      leaderEpoch,
+      isr,
+      replicas,
+      partitionEpoch,
+      isNew = true
+    ))
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
+
+    assertEquals(Set(brokerId, follower1, follower2, follower3), 
partition.partitionState.isr)
+    // verify the AlterIsr request will be sent twice
+    verify(mockChannelManager, times(2)).sendRequest(any(), any())
+    // After retry, the AlterIsr should succeed, and no in-flight request

Review Comment:
   nit: How about `After the retry, the partition state should be committed`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })
+      .thenAnswer(invocation => {
+      val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+      
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+    })
+
+    assertTrue(makeLeader(
+      topicId = None,
+      controllerEpoch,
+      leaderEpoch,
+      isr,
+      replicas,
+      partitionEpoch,
+      isNew = true
+    ))
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
+
+    assertEquals(Set(brokerId, follower1, follower2, follower3), 
partition.partitionState.isr)
+    // verify the AlterIsr request will be sent twice

Review Comment:
   nit: `Verify that the AlterPartition request was sent twice



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })
+      .thenAnswer(invocation => {
+      val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+      
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+    })
+
+    assertTrue(makeLeader(
+      topicId = None,
+      controllerEpoch,
+      leaderEpoch,
+      isr,
+      replicas,
+      partitionEpoch,
+      isNew = true
+    ))
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
+
+    assertEquals(Set(brokerId, follower1, follower2, follower3), 
partition.partitionState.isr)

Review Comment:
   nit: Could we also assert the partition epoch?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it

Review Comment:
   nit: How about `Fail the first alter partition request with a retryable 
error to trigger a retry from the partition callback`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })
+      .thenAnswer(invocation => {
+      val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+      
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+    })

Review Comment:
   nit: Indentation is also weird here.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,106 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  def createClientResponseWithAlterPartitionResponse(topicPartition: 
TopicPartition,
+                                                     partitionErrorCode: Short,
+                                                     isr: List[Int] = 
List.empty,
+                                                     leaderEpoch: Int = 0,
+                                                     partitionEpoch: Int = 0
+                                                    ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic())
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition())
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics().add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterIsrRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // create a response with error in partition data level, so it'll be 
handled in AlterPartitionManager, and then Partition callback will re-submit it
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())
+
+    // create a 2nd response with no error
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.NONE.code(), List(brokerId, follower1, follower2, follower3), 
leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer(invocation => {
+        val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+    })
+      .thenAnswer(invocation => {
+      val controllerRequestCompletionHandler = 
invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+      
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+    })
+
+    assertTrue(makeLeader(
+      topicId = None,
+      controllerEpoch,
+      leaderEpoch,
+      isr,
+      replicas,
+      partitionEpoch,
+      isNew = true
+    ))
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
+
+    assertEquals(Set(brokerId, follower1, follower2, follower3), 
partition.partitionState.isr)
+    // verify the AlterIsr request will be sent twice
+    verify(mockChannelManager, times(2)).sendRequest(any(), any())
+    // After retry, the AlterIsr should succeed, and no in-flight request
+    assertFalse(partition.partitionState.isInflight)
+  }
+

Review Comment:
   It would be great if we could also add a unit test in 
`AlterPartitionManagerTest` as they issue is in that component.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to