jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1432873518
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -190,4 +170,62 @@ class CoordinatorPartitionWriter[T]( throw Errors.NOT_LEADER_OR_FOLLOWER.exception() } } + + /** + * Write the transaction end marker. + * + * @param tp The partition to write records to. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param coordinatorEpoch The epoch of the transaction coordinator. + * @param result The transaction result. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def appendEndTransactionMarker( + tp: TopicPartition, + producerId: Long, + producerEpoch: Short, + coordinatorEpoch: Int, + result: TransactionResult + ): Long = { + val controlRecordType = result match { + case TransactionResult.COMMIT => ControlRecordType.COMMIT + case TransactionResult.ABORT => ControlRecordType.ABORT + } + + internalAppend(tp, MemoryRecords.withEndTransactionMarker( + producerId, + producerEpoch, + new EndTransactionMarker(controlRecordType, coordinatorEpoch) + )) + } + + private def internalAppend( + tp: TopicPartition, + memoryRecords: MemoryRecords + ): Long = { + var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + replicaManager.appendRecords( + timeout = 0L, + requiredAcks = 1, Review Comment: were these always acks=1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org