jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1418097344
########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int, consumerId: String, offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, - transactionalId: String = null, producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { - // first filter out partitions with offset metadata size exceeding limit - val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) - } - group.inLock { if (!group.hasReceivedConsistentOffsetCommits) warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + s"should be avoided.") } - val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID - // construct the message set to append + val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) + } if (filteredOffsetMetadata.isEmpty) { // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE } responseCallback(commitStatus) - } else { - getMagic(partitionFor(group.groupId)) match { - case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() - - val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => - val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicIdPartition.topicPartition) - val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) - new SimpleRecord(timestamp, key, value) - } - val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) - val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) - - if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) - throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) - - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), - producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) - - records.foreach(builder.append) - val entries = Map(offsetTopicPartition -> builder.build()) - - // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - // the append response should only contain the topics partition - if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) - throw new IllegalStateException("Append status %s should only have one partition %s" - .format(responseStatus, offsetTopicPartition)) - - // construct the commit response status and insert - // the offset and metadata to cache if the append status has no error - val status = responseStatus(offsetTopicPartition) - - val responseError = group.inLock { - if (status.error == Errors.NONE) { - if (!group.is(Dead)) { - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => - if (isTxnOffsetCommit) - group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) - else - group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) - } - } - - // Record the number of offsets committed to the log - offsetCommitsSensor.record(records.size) - - Errors.NONE - } else { - if (!group.is(Dead)) { - if (!group.hasPendingOffsetCommitsFromProducer(producerId)) - removeProducerGroup(producerId, group.groupId) - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => - if (isTxnOffsetCommit) - group.failPendingTxnOffsetCommit(producerId, topicIdPartition) - else - group.failPendingOffsetWrite(topicIdPartition, offsetAndMetadata) - } - } - - debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " + - s"with generation ${group.generationId} failed when appending to log due to ${status.error.exceptionName}") - - // transform the log append error code to the corresponding the commit status error code - status.error match { - case Errors.UNKNOWN_TOPIC_OR_PARTITION - | Errors.NOT_ENOUGH_REPLICAS - | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => - Errors.COORDINATOR_NOT_AVAILABLE - - case Errors.NOT_LEADER_OR_FOLLOWER - | Errors.KAFKA_STORAGE_ERROR => - Errors.NOT_COORDINATOR - - case Errors.MESSAGE_TOO_LARGE - | Errors.RECORD_LIST_TOO_LARGE - | Errors.INVALID_FETCH_SIZE => - Errors.INVALID_COMMIT_OFFSET_SIZE + return + } - case other => other - } - } - } + val magicOpt = getMagic(partitionFor(group.groupId)) + if (magicOpt.isEmpty) { + val commitStatus = offsetMetadata.map { case (topicIdPartition, _) => + (topicIdPartition, Errors.NOT_COORDINATOR) + } + responseCallback(commitStatus) + return + } - // compute the final error codes for the commit response - val commitStatus = offsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => - if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) - (topicIdPartition, responseError) - else - (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE) - } + val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID + val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, group.groupId, filteredOffsetMetadata, producerId, producerEpoch) + val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, producerId, records) - // finally trigger the callback logic passed from the API layer - responseCallback(commitStatus) - } + group.inLock { + group.prepareOffsetCommit(offsetMetadata) + } - if (isTxnOffsetCommit) { - group.inLock { - addProducerGroup(producerId, group.groupId) - group.prepareTxnOffsetCommit(producerId, offsetMetadata) - } - } else { - group.inLock { - group.prepareOffsetCommit(offsetMetadata) - } - } + appendForGroup(group, records, requestLocal, putCacheCallback) + } - appendForGroup(group, entries, requestLocal, putCacheCallback, transactionalId) + def storeOffsetsAfterVerification(group: GroupMetadata, + verifiedOffsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], + records: Map[TopicPartition, MemoryRecords], + putCacheCallback: Map[TopicPartition, PartitionResponse] => Unit, + producerId: Long, + transactionVerificationEntries: TransactionVerificationEntries, + errorResults: Map[TopicPartition, LogAppendResult], + requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { + group.inLock { Review Comment: I've started a new thread on this on the updated code: https://github.com/apache/kafka/pull/14774#discussion_r1418096753 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org