artemlivshits commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414599997


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (authorizedRequestInfo.isEmpty)
-      sendResponseCallback(Map.empty)
-    else {
-      val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+    val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+    val transactionVerificationEntries = new 
ReplicaManager.TransactionVerificationEntries
 
-      // call the replica manager to append messages to the replicas
+    def postVerificationCallback(newRequestLocal: RequestLocal)

Review Comment:
   Do we need to change this file?  I think we could preserve the abstraction 
and keep the verification guts encapsulated in the async call (as it is now), 
and only let the group coordinator use the explicit stages.
   
   I.e. replicaManager.appendRecords would do what it does now (under the 
covers call the stages), then we don't have to bring the complexity into the 
code that works well with the abstraction.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int,
                    consumerId: String,
                    offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-                   transactionalId: String = null,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-    // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
-    }
-
     group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
           s"should be avoided.")
     }
 
-    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-    // construct the message set to append
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
       responseCallback(commitStatus)
-    } else {
-      getMagic(partitionFor(group.groupId)) match {
-        case Some(magicValue) =>
-          // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-          val timestampType = TimestampType.CREATE_TIME
-          val timestamp = time.milliseconds()
-
-          val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-            val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-            new SimpleRecord(timestamp, key, value)
-          }
-          val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-          val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-          if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-            throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-            producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-          records.foreach(builder.append)
-          val entries = Map(offsetTopicPartition -> builder.build())
-
-          // set the callback function to insert offsets into cache after log 
append completed
-          def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-            // the append response should only contain the topics partition
-            if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-              throw new IllegalStateException("Append status %s should only 
have one partition %s"
-                .format(responseStatus, offsetTopicPartition))
-
-            // construct the commit response status and insert
-            // the offset and metadata to cache if the append status has no 
error
-            val status = responseStatus(offsetTopicPartition)
-
-            val responseError = group.inLock {
-              if (status.error == Errors.NONE) {
-                if (!group.is(Dead)) {
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-                    else
-                      group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-                  }
-                }
-
-                // Record the number of offsets committed to the log
-                offsetCommitsSensor.record(records.size)
-
-                Errors.NONE
-              } else {
-                if (!group.is(Dead)) {
-                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
-                    removeProducerGroup(producerId, group.groupId)
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, 
topicIdPartition)
-                    else
-                      group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
-                  }
-                }
-
-                debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
-                  s"with generation ${group.generationId} failed when 
appending to log due to ${status.error.exceptionName}")
-
-                // transform the log append error code to the corresponding 
the commit status error code
-                status.error match {
-                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
-                       | Errors.NOT_ENOUGH_REPLICAS
-                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.COORDINATOR_NOT_AVAILABLE
-
-                  case Errors.NOT_LEADER_OR_FOLLOWER
-                       | Errors.KAFKA_STORAGE_ERROR =>
-                    Errors.NOT_COORDINATOR
-
-                  case Errors.MESSAGE_TOO_LARGE
-                       | Errors.RECORD_LIST_TOO_LARGE
-                       | Errors.INVALID_FETCH_SIZE =>
-                    Errors.INVALID_COMMIT_OFFSET_SIZE
+      return
+    }
 
-                  case other => other
-                }
-              }
-            }
+    val magicOpt = getMagic(partitionFor(group.groupId))
+    if (magicOpt.isEmpty) {
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+        (topicIdPartition, Errors.NOT_COORDINATOR)
+      }
+      responseCallback(commitStatus)
+      return
+    }
 
-            // compute the final error codes for the commit response
-            val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-                (topicIdPartition, responseError)
-              else
-                (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-            }
+    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
+    val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, 
group.groupId, filteredOffsetMetadata, producerId, producerEpoch)
+    val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, 
consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, 
producerId, records)
 
-            // finally trigger the callback logic passed from the API layer
-            responseCallback(commitStatus)
-          }
+    group.inLock {
+        group.prepareOffsetCommit(offsetMetadata)
+    }
 
-          if (isTxnOffsetCommit) {
-            group.inLock {
-              addProducerGroup(producerId, group.groupId)
-              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
-            }
-          } else {
-            group.inLock {
-              group.prepareOffsetCommit(offsetMetadata)
-            }
-          }
+    appendForGroup(group, records, requestLocal, putCacheCallback)
+  }
 
-          appendForGroup(group, entries, requestLocal, putCacheCallback, 
transactionalId)
+  def storeOffsetsAfterVerification(group: GroupMetadata,
+                                    verifiedOffsetMetadata: 
immutable.Map[TopicIdPartition, OffsetAndMetadata],
+                                    records: Map[TopicPartition, 
MemoryRecords],
+                                    putCacheCallback:  Map[TopicPartition, 
PartitionResponse] => Unit,
+                                    producerId: Long,
+                                    transactionVerificationEntries: 
TransactionVerificationEntries,
+                                    errorResults: Map[TopicPartition, 
LogAppendResult],
+                                    requestLocal: RequestLocal = 
RequestLocal.NoCaching): Unit = {
+    group.inLock {

Review Comment:
   Wouldn't this whole function have to be called under group lock in order for 
append to happen partially under the lock (sort of the reason for this change)? 
 If so let's add a comment that explains the precondition and remove extra lock 
taking.  Re-taking the locks when not necessary makes it hard to track lock 
scopes.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-                                              entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-                                              verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+                                             responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+    // If required.acks is outside accepted range, something is wrong with the 
client
+    // Just return an error and don't handle the request at all
+    val responseStatus = entries.map { case (topicPartition, _) =>
+      topicPartition -> new PartitionResponse(
+        Errors.INVALID_REQUIRED_ACKS,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+        RecordBatch.NO_TIMESTAMP,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+      )
+    }
+    responseCallback(responseStatus)
+  }
+
+  /**
+   * Apply the postVerificationCallback asynchronously only after verifying 
the partitions have been added to the transaction.
+   * The postVerificationCallback takes the arguments of the requestLocal for 
the thread that will be doing the append as
+   * well as a mapping of topic partitions to LogAppendResult for the 
partitions that saw errors when verifying.
+   *
+   * This method will start the verification process for all the 
topicPartitions in entriesPerPartition and supply the
+   * postVerificationCallback to be run on a request handler thread when the 
response is received.
+   *
+   * @param entriesPerPartition            the records per partition to be 
appended and therefore need verification
+   * @param transactionVerificationEntries the object that will store the 
entries to verify, the errors, and the verification guards
+   * @param transactionalId                the id for the transaction
+   * @param requestLocal                   container for the stateful 
instances scoped to this request -- this must correspond to the
+   *                                       thread calling this method
+   * @param postVerificationCallback       the method to be called when 
verification completes and the verification errors
+   *                                       and the thread's RequestLocal are 
supplied
+   */
+  def appendRecordsWithTransactionVerification(entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
+                                               transactionVerificationEntries: 
TransactionVerificationEntries,
+                                               transactionalId: String,
+                                               requestLocal: RequestLocal,
+                                               postVerificationCallback: 
RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = {
+    if (transactionalId != null && 
config.transactionPartitionVerificationEnable && 
addPartitionsToTxnManager.isDefined)
+      partitionEntriesForVerification(transactionVerificationEntries, 
entriesPerPartition)
+
+    val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) => 
Unit =
+      executePostVerificationCallback(
+        transactionVerificationEntries,
+        postVerificationCallback,
+      )
+
+    if (transactionVerificationEntries.unverified.isEmpty) {
+      onVerificationComplete(requestLocal, 
transactionVerificationEntries.errors.toMap)
+    } else {
+      // For unverified entries, send a request to verify. When verified, the 
append process will proceed via the callback.
+      // We verify above that all partitions use the same producer ID.
+      val batchInfo = 
transactionVerificationEntries.unverified.head._2.firstBatch()
+      addPartitionsToTxnManager.foreach(_.verifyTransaction(
+        transactionalId = transactionalId,
+        producerId = batchInfo.producerId,
+        producerEpoch = batchInfo.producerEpoch,
+        topicPartitions = 
transactionVerificationEntries.unverified.keySet.toSeq,
+        callback = 
KafkaRequestHandler.wrapAsyncCallback(onVerificationComplete, requestLocal)
+      ))
+    }
+  }
+
+  /**
+   * A helper method to compile the results from the transaction verification 
and call the postVerificationCallback.
+   *
+   * @param transactionVerificationEntries the object that will store the 
entries to verify, the errors, and the verification guards
+   * @param postVerificationCallback       the method to be called when 
verification completes and the verification errors
+   *                                       and the thread's RequestLocal are 
supplied
+   * @param requestLocal                   container for the stateful 
instances scoped to this request -- this must correspond to the
+   *                                       thread calling this method
+   *
+   */
+  private def executePostVerificationCallback(transactionVerificationEntries: 
TransactionVerificationEntries,
+                                                      
postVerificationCallback: RequestLocal => Map[TopicPartition, LogAppendResult] 
=> Unit)
+                                                     (requestLocal: 
RequestLocal, unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
+    val errorResults = (unverifiedEntries ++ 
transactionVerificationEntries.errors).map {
+      case (topicPartition, error) =>
+        // translate transaction coordinator errors to known producer response 
errors
+        val customException =
+          error match {
+            case Errors.INVALID_TXN_STATE => Some(error.exception("Partition 
was not added to the transaction"))
+            case Errors.CONCURRENT_TRANSACTIONS |
+                 Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                 Errors.COORDINATOR_NOT_AVAILABLE |
+                 Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
+              s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+            case _ => None
+          }
+        topicPartition -> LogAppendResult(
+          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+          Some(customException.getOrElse(error.exception)),
+          hasCustomErrorMessage = customException.isDefined
+        )
+    }
+    postVerificationCallback(requestLocal)(errorResults)
+  }
+
+  private def partitionEntriesForVerification(transactionVerificationEntries: 
TransactionVerificationEntries, entriesPerPartition: Map[TopicPartition, 
MemoryRecords]): TransactionVerificationEntries = {

Review Comment:
   Do we use the result of this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to