jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1525559114


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest {
       any())
   }
 
+  @Test
+  def shouldNotLoseTxnCompletionAfterLoad(): Unit = {
+    mockCache()
+
+    val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+    when(metadataCache.getPartitionLeaderEndpoint(
+      ArgumentMatchers.eq(partition1.topic),
+      ArgumentMatchers.eq(partition1.partition),
+      any())
+    ).thenReturn(Some(broker1))
+
+    // Build a successful client response.
+    val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+    val successfulResponse = new WriteTxnMarkersResponse(
+      Collections.singletonMap(producerId2: java.lang.Long, 
Collections.singletonMap(partition1, Errors.NONE)))
+    val successfulClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), false, null, null,
+      successfulResponse)
+
+    // Build a disconnected client response.
+    val disconnectedClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), true, null, null,
+      null)
+
+    // Test matrix to cover various scenarios:
+    val clientResponses = Seq(successfulClientResponse, 
disconnectedClientResponse)
+    val getTransactionStateResponses = Seq(
+      // NOT_COORDINATOR error case
+      Left(Errors.NOT_COORDINATOR),
+      // COORDINATOR_LOAD_IN_PROGRESS
+      Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+      // "Newly loaded" transaction state with the new epoch.
+      Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, 
txnMetadata2)))
+    )
+
+    clientResponses.foreach { clientResponse =>
+      getTransactionStateResponses.foreach { getTransactionStateResponse =>
+        // Reset data from previous iteration.
+        txnMetadata2.topicPartitions.add(partition1)
+        clearInvocations(txnStateManager)
+        // Send out markers for a transaction before load.
+        channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+          txnMetadata2, expectedTransition)
+
+        // Drain the marker to make it "in-flight".
+        val requests1 = channelManager.generateRequests().asScala
+        assertEquals(1, requests1.size)
+
+        // Simulate a partition load:

Review Comment:
   To confirm my understanding, this test simulates sending out marker requests 
with various responses. We want to send out the requests and then unload and 
load the pending markers so that we bump the epoch and send out the requests 
again. This test ensures that when we do these operations, we still correctly 
complete the transaction when we get a successful response after reloading.
   
   Is there ever a case of `getTransactionStateResponse` where we have the old 
epoch? I noticed we have the case with the new epoch.
   
   Also in the case with the new epoch, do we want to confirm the transaction 
doesn't complete?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to