jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414412642


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -909,8 +910,68 @@ private[group] class GroupCoordinator(
         val group = groupManager.getGroup(groupId).getOrElse {
           groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
         }
-        doTxnCommitOffsets(group, transactionalId, memberId, groupInstanceId, 
generationId, producerId, producerEpoch,
-          offsetMetadata, requestLocal, responseCallback)
+
+        val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+          groupManager.validateOffsetMetadataLength(offsetAndMetadata.metadata)
+        }
+        if (filteredOffsetMetadata.isEmpty) {
+          // compute the final error codes for the commit response
+          val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
+          responseCallback(commitStatus)
+          return
+        }
+
+        val magicOpt = groupManager.getMagic(partitionFor(group.groupId))
+        if (magicOpt.isEmpty) {
+          val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+            (topicIdPartition, Errors.NOT_COORDINATOR)
+          }
+          responseCallback(commitStatus)
+          return
+        }
+
+        val records = groupManager.generateOffsetRecords(magicOpt.get, true, 
group.groupId, filteredOffsetMetadata, producerId, producerEpoch)
+        val transactionVerificationEntries = new TransactionVerificationEntries
+
+        def postVerificationCallback(newRequestLocal: RequestLocal)
+                                    (errorResults: Map[TopicPartition, 
LogAppendResult]): Unit = {
+          group.inLock {
+            val validationErrorOpt = validateOffsetCommit(
+              group,
+              generationId,
+              memberId,
+              groupInstanceId,
+              isTransactional = true
+            )
+
+            val verifiedOffsets = offsetMetadata.filter {
+              case (tp, _) =>
+                !errorResults.contains(tp.topicPartition)
+            }
+
+            val verifiedRecords = records.filter {
+              case (tp, _) =>
+                !errorResults.contains(tp)
+            }
+
+            if (validationErrorOpt.isDefined) {
+              responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })
+            } else if (verifiedOffsets.isEmpty) {
+              responseCallback(offsetMetadata.map { case (k, _) => k -> 
errorResults(k.topicPartition).error })
+            } else {
+              val putCacheCallback = groupManager.createPutCacheCallback(true, 
group, memberId, offsetMetadata, verifiedOffsets, responseCallback, producerId, 
verifiedRecords, errorResults)
+              groupManager.storeOffsetsAfterVerification(group, 
verifiedOffsets, records, putCacheCallback, producerId, 
transactionVerificationEntries, errorResults, newRequestLocal)

Review Comment:
   I struggled a bit with some of these params since we could just pass the 
verifiedRecords. 
   
   We redo the filtering in the append path and I decided to keep all the 
records so that we can also track (in purgatory etc) the ones that failed. This 
is likely not required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to