jeffkbkim commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1430514027
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -190,4 +170,62 @@ class CoordinatorPartitionWriter[T]( throw Errors.NOT_LEADER_OR_FOLLOWER.exception() } } + + /** + * Write the transaction end marker. + * + * @param tp The partition to write records to. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param coordinatorEpoch The epoch of the transaction coordinator. + * @param result The transaction result. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def appendTransactionEndMarker( Review Comment: nit: should this be appendEndTransactionMarker? also in L175 ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2416,14 +2419,56 @@ class KafkaApis(val requestChannel: RequestChannel, numAppends.decrementAndGet() skippedMarkers += 1 } else { - val controlRecords = partitionsWithCompatibleMessageFormat.map { partition => - val controlRecordType = marker.transactionResult match { - case TransactionResult.COMMIT => ControlRecordType.COMMIT - case TransactionResult.ABORT => ControlRecordType.ABORT + val controlRecordType = marker.transactionResult match { + case TransactionResult.COMMIT => ControlRecordType.COMMIT + case TransactionResult.ABORT => ControlRecordType.ABORT + } + + val markerResults = new ConcurrentHashMap[TopicPartition, Errors]() + def maybeComplete(): Unit = { + if (partitionsWithCompatibleMessageFormat.size == markerResults.size) { + maybeSendResponseCallback(producerId, marker.transactionResult, markerResults) + } + } + + val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords] + partitionsWithCompatibleMessageFormat.foreach { partition => + if (config.isNewGroupCoordinatorEnabled && partition.topic == GROUP_METADATA_TOPIC_NAME) { + // When the new group coordinator is used, writing the end marker is fully delegated + // to the group coordinator. + groupCoordinator.completeTransaction( + partition, + marker.producerId, + marker.producerEpoch, + marker.coordinatorEpoch, + marker.transactionResult, + Duration.ofMillis(config.requestTimeoutMs.toLong) + ).whenComplete { (_, exception) => + val error = if (exception == null) { + Errors.NONE + } else { + Errors.forException(exception) match { + case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => + // The transaction coordinator does not expect those errors so we translate them + // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet. + Errors.NOT_LEADER_OR_FOLLOWER + case error => + error + } + } + markerResults.put(partition, error) + maybeComplete() + } + } else { + // Otherwise, the regular appendRecords path is used for all the non __consumer_offsets + // partitions or for all partitions when the group coordinator is disabled. Review Comment: nit: when the "new group coordinator" is disabled ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2416,14 +2419,56 @@ class KafkaApis(val requestChannel: RequestChannel, numAppends.decrementAndGet() skippedMarkers += 1 } else { - val controlRecords = partitionsWithCompatibleMessageFormat.map { partition => - val controlRecordType = marker.transactionResult match { - case TransactionResult.COMMIT => ControlRecordType.COMMIT - case TransactionResult.ABORT => ControlRecordType.ABORT + val controlRecordType = marker.transactionResult match { + case TransactionResult.COMMIT => ControlRecordType.COMMIT + case TransactionResult.ABORT => ControlRecordType.ABORT + } + + val markerResults = new ConcurrentHashMap[TopicPartition, Errors]() Review Comment: to understand: * markerResults represents the to-be appended error result for each topic partition in the marker. * it is concurrent since it also gets updated via appendRecords() response callback. * it is updated separately in the new group coordinator as we append through the new runtime framework and not via appendRecords(). * we check `if (partitionsWithCompatibleMessageFormat.size == markerResults.size)` to only call `maybeSendResponseCallback` once for all partitions in a marker. `numAppends` serves a similar purpose but it applies to a one final response for all markers. does this sound right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org