chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440576071



##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.groupManager.handleTxnCompletion(producerId,
-        offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+      val isCommit = random.nextBoolean
+      try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+      catch {
+        case e: IllegalStateException if isCommit
+          && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
       > That seems a bug.
   
   The root cause (changed by this PR) is that the "txn initialization" and 
"txn append" are not executed within same lock.
   
   **The test story is shown below.**
   
   ```CommitTxnOffsetsOperation``` calls 
```GroupMetadata.prepareTxnOffsetCommit``` to add 
```CommitRecordMetadataAndOffset(None, offsetAndMetadata)``` to 
```pendingTransactionalOffsetCommits``` (this is the link you attached).
   
   ```GroupMetadata.completePendingTxnOffsetCommit``` called by 
```CompleteTxnOperation``` throws ```IllegalStateException``` if 
```CommitRecordMetadataAndOffset.appendedBatchOffset``` is ```None``` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L664).
 
   
   **Why it does not cause error before?**
   
   ```CommitRecordMetadataAndOffset.appendedBatchOffset``` is updated by the 
callback ```putCacheCallback``` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407).
 ```TestReplicManager``` always create ```delayedProduce``` do handle the 
```putCacheCallback``` 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L188).
 The condition to complete the ```delayedProduce``` is 
```completeAttempts.incrementAndGet() >= 3```. And the condition gets true when 
call both ```producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)``` and ```tryCompleteDelayedRequests()``` since the former 
calls ```tryComplete``` two times and another calls ```tryComplete``` once. It 
means ```putCacheCallback``` is always executed by 
```TestReplicManager.appendRecords``` and noted that 
```TestReplicManager.appendRecords``` is executed within a group lock 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L738)
 . In short, txn initialization 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L464)
 and txn append 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407)
 are executed with same group lock. Hence, the following execution order is 
impossible.
   
   1. txn initialization
   1. txn completion
   1. txn append
   
   However, this PR disable to complete delayed requests within group lock held 
by caller. The ```putCacheCallback``` which used to append txn needs to require 
group lock again.
         
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to