jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1420791705


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback,
       requestLocal = requestLocal,
-      transactionalId = transactionalId)
+      verificationGuards = verificationGuards,
+      preAppendErrors = preAppendErrors)
+  }
+
+  def generateOffsetRecords(magicValue: Byte,
+                            isTxnOffsetCommit: Boolean,
+                            groupId: String,
+                            offsetTopicPartition: TopicPartition,
+                            filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                            producerId: Long,
+                            producerEpoch: Short): Map[TopicPartition, 
MemoryRecords] = {
+      // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
+      val timestampType = TimestampType.CREATE_TIME
+      val timestamp = time.milliseconds()
+
+      val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        val key = GroupMetadataManager.offsetCommitKey(groupId, 
topicIdPartition.topicPartition)
+        val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
+        new SimpleRecord(timestamp, key, value)
+      }
+      val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
+
+      if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+        throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to 
make a transaction offset commit with an invalid magic: " + magicValue)
+
+      val builder = MemoryRecords.builder(buffer, magicValue, compressionType, 
timestampType, 0L, time.milliseconds(),
+        producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
+      records.foreach(builder.append)
+      Map(offsetTopicPartition -> builder.build())
+  }
+
+  def createPutCacheCallback(isTxnOffsetCommit: Boolean,
+                             group: GroupMetadata,
+                             consumerId: String,
+                             offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                             filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                             responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
+                             producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                             records: Map[TopicPartition, MemoryRecords],
+                             preAppendErrors: Map[TopicPartition, 
LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = 
{
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+    // set the callback function to insert offsets into cache after log append 
completed
+    def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
+        throw new IllegalStateException("Append status %s should only have one 
partition %s"
+          .format(responseStatus, offsetTopicPartition))
+
+      // construct the commit response status and insert
+      // the offset and metadata to cache if the append status has no error
+      val status = responseStatus(offsetTopicPartition)
+
+      val responseError = group.inLock {
+        if (status.error == Errors.NONE) {
+          if (!group.is(Dead)) {
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+              else
+                group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+            }
+          }
+
+          // Record the number of offsets committed to the log
+          offsetCommitsSensor.record(records.size)
+
+          Errors.NONE
+        } else {
+          if (!group.is(Dead)) {
+            if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+              removeProducerGroup(producerId, group.groupId)
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
+              else
+                group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
+            }
+          }
+
+          debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
+            s"with generation ${group.generationId} failed when appending to 
log due to ${status.error.exceptionName}")
+
+          // transform the log append error code to the corresponding the 
commit status error code
+          status.error match {
+            case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                 | Errors.NOT_ENOUGH_REPLICAS
+                 | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+              Errors.COORDINATOR_NOT_AVAILABLE
+
+            case Errors.NOT_LEADER_OR_FOLLOWER
+                 | Errors.KAFKA_STORAGE_ERROR =>
+              Errors.NOT_COORDINATOR
+
+            case Errors.MESSAGE_TOO_LARGE
+                 | Errors.RECORD_LIST_TOO_LARGE
+                 | Errors.INVALID_FETCH_SIZE =>
+              Errors.INVALID_COMMIT_OFFSET_SIZE
+
+            case other => other
+          }
+        }
+      }
+
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        if (!validateOffsetMetadataLength(offsetAndMetadata.metadata))
+          (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+        else if (preAppendErrors.contains(topicIdPartition.topicPartition))
+          (topicIdPartition, 
preAppendErrors(topicIdPartition.topicPartition).error)
+        else
+          (topicIdPartition, responseError)
+      }
+
+      // finally trigger the callback logic passed from the API layer
+      responseCallback(commitStatus)
+    }
+    putCacheCallback
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to 
cache
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
+                   offsetTopicPartition: TopicPartition,
                    offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-                   transactionalId: String = null,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
-                   requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-    // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
-    }
-
+                   requestLocal: RequestLocal = RequestLocal.NoCaching,
+                   verificationGuard: Option[VerificationGuard]): Unit = {
     group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
           s"should be avoided.")
     }
 
-    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-    // construct the message set to append
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
       responseCallback(commitStatus)
-    } else {
-      getMagic(partitionFor(group.groupId)) match {
-        case Some(magicValue) =>
-          // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-          val timestampType = TimestampType.CREATE_TIME
-          val timestamp = time.milliseconds()
-
-          val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-            val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-            new SimpleRecord(timestamp, key, value)
-          }
-          val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-          val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-          if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-            throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-            producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-          records.foreach(builder.append)
-          val entries = Map(offsetTopicPartition -> builder.build())
-
-          // set the callback function to insert offsets into cache after log 
append completed
-          def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-            // the append response should only contain the topics partition
-            if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-              throw new IllegalStateException("Append status %s should only 
have one partition %s"
-                .format(responseStatus, offsetTopicPartition))
-
-            // construct the commit response status and insert
-            // the offset and metadata to cache if the append status has no 
error
-            val status = responseStatus(offsetTopicPartition)
-
-            val responseError = group.inLock {
-              if (status.error == Errors.NONE) {
-                if (!group.is(Dead)) {
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-                    else
-                      group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-                  }
-                }
-
-                // Record the number of offsets committed to the log
-                offsetCommitsSensor.record(records.size)
-
-                Errors.NONE
-              } else {
-                if (!group.is(Dead)) {
-                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
-                    removeProducerGroup(producerId, group.groupId)
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, 
topicIdPartition)
-                    else
-                      group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
-                  }
-                }
-
-                debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
-                  s"with generation ${group.generationId} failed when 
appending to log due to ${status.error.exceptionName}")
-
-                // transform the log append error code to the corresponding 
the commit status error code
-                status.error match {
-                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
-                       | Errors.NOT_ENOUGH_REPLICAS
-                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.COORDINATOR_NOT_AVAILABLE
-
-                  case Errors.NOT_LEADER_OR_FOLLOWER
-                       | Errors.KAFKA_STORAGE_ERROR =>
-                    Errors.NOT_COORDINATOR
-
-                  case Errors.MESSAGE_TOO_LARGE
-                       | Errors.RECORD_LIST_TOO_LARGE
-                       | Errors.INVALID_FETCH_SIZE =>
-                    Errors.INVALID_COMMIT_OFFSET_SIZE
-
-                  case other => other
-                }
-              }
-            }
-
-            // compute the final error codes for the commit response
-            val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-                (topicIdPartition, responseError)
-              else
-                (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-            }
+      return
+    }
 
-            // finally trigger the callback logic passed from the API layer
-            responseCallback(commitStatus)
-          }
+    val magicOpt = getMagic(partitionFor(group.groupId))
+    if (magicOpt.isEmpty) {
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+        (topicIdPartition, Errors.NOT_COORDINATOR)
+      }
+      responseCallback(commitStatus)
+      return
+    }
 
-          if (isTxnOffsetCommit) {
-            group.inLock {
-              addProducerGroup(producerId, group.groupId)
-              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
-            }
-          } else {
-            group.inLock {
-              group.prepareOffsetCommit(offsetMetadata)
-            }
-          }
+    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
+    val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, 
group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, 
producerEpoch)
+    val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, 
consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, 
producerId, records)
 
-          appendForGroup(group, entries, requestLocal, putCacheCallback, 
transactionalId)
+    val verificationGuards = verificationGuard match {
+      case Some(guard) => Map(offsetTopicPartition -> guard)
+      case None => Map.empty[TopicPartition, VerificationGuard]
+    }
 
-        case None =>
-          val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
-            (topicIdPartition, Errors.NOT_COORDINATOR)
-          }
-          responseCallback(commitStatus)
+    group.inLock {

Review Comment:
   Ok makes sense. 
   
   The only thing that made me wonder about keeping the locking is that this 
method is used in unit tests without the locking on the outside. I wasn't sure 
if the best solution is to put locks around those calls or not. I would hope 
that the test doesn't rely on locking given there really should only be one 
thread running the tests and the ReplicaManager is mocked (so no async appends) 
but I would need to double check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to