showuon commented on code in PR #12329:
URL: https://github.com/apache/kafka/pull/12329#discussion_r908283629


##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +179,56 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testSubmitFromCallback(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    val leaderId = 1
+    val leaderEpoch = 1
+    val partitionEpoch = 10
+    val isr = List(1,2,3)
+    val leaderAndIsr = new LeaderAndIsr(leaderId,leaderEpoch, isr, 
LeaderRecoveryState.RECOVERED, partitionEpoch)
+    val callbackCapture = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+    val scheduler = new MockScheduler(time)
+    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
+    alterPartitionManager.start()
+    val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+    val finalFuture = new CompletableFuture[LeaderAndIsr]()
+    future.whenComplete { (_, e) =>
+      if (e != null) {
+        // Retry when error.
+        alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { 
(result, e) =>
+          if (e != null) {
+            finalFuture.completeExceptionally(e)
+          } else {
+            finalFuture.complete(result)
+          }
+        }
+      } else {
+        finalFuture.completeExceptionally(new AssertionError("Expected the 
future to be failed"))
+      }
+    }
+
+    verify(brokerToController).start()
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    reset(brokerToController)
+    callbackCapture.getValue.onComplete(errorResponse)
+
+    // complete the retry request
+    val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, 
partitionEpoch, leaderId, leaderEpoch, isr)
+    val retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    callbackCapture.getValue.onComplete(retryResponse)
+
+    assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
+    // No more items in unsentIsrUpdates

Review Comment:
   I found other tests in this test suite are starting with a capital letter. 
Following the pattern. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to