junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440514740



##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.groupManager.handleTxnCompletion(producerId,
-        offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+      val isCommit = random.nextBoolean
+      try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+      catch {
+        case e: IllegalStateException if isCommit
+          && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
       Thanks. I am still not sure that I fully understand this. It seems that 
by not completing the delayedProduce within the group lock, we are hitting 
IllegalStateException. Do you know which code depends on that? It seems that we 
do hold a group lock when updating the txnOffset.
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L462

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may

Review comment:
       Yes, we can refactor that in a separate PR. Could you file a followup 
jira for that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to