jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523732734


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
     markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-        new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-    queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+      info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")
+      new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
+    })
+    queue.add(pendingCompleteTxnAndMarker)
+
+    if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
+      // This could happen if the queue got removed concurrently.

Review Comment:
   Should we do something if we added to a "dead queue"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to