Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan merged PR #15087: URL: https://github.com/apache/kafka/pull/15087 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144631 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2560,41 +2562,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) - def testMaybeVerificationErrorConversions(error: Errors): Unit = { Review Comment: We also have one for the GroupCoordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144452 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2560,41 +2562,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) - def testMaybeVerificationErrorConversions(error: Errors): Unit = { Review Comment: We have the test above in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143981 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal, verificationErrors: Map[TopicPartition, Errors] ): Unit = { - // Map transaction coordinator errors to known errors for the response - val convertedErrors = verificationErrors.map { case (tp, error) => -error match { - case Errors.CONCURRENT_TRANSACTIONS | -Errors.COORDINATOR_LOAD_IN_PROGRESS | -Errors.COORDINATOR_NOT_AVAILABLE | -Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS - case _ => tp -> error -} - - } Review Comment: But yes, we simply pass through concurrent txns which will be fatal to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143630 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal, verificationErrors: Map[TopicPartition, Errors] ): Unit = { - // Map transaction coordinator errors to known errors for the response - val convertedErrors = verificationErrors.map { case (tp, error) => -error match { - case Errors.CONCURRENT_TRANSACTIONS | -Errors.COORDINATOR_LOAD_IN_PROGRESS | -Errors.COORDINATOR_NOT_AVAILABLE | -Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS - case _ => tp -> error -} - - } Review Comment: We have separate handling for produce requests and txn offset commit requests. for produce: ``` case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.COORDINATOR_NOT_AVAILABLE | Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) case _ => None ``` for txn offset commit: ``` error match { case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => Errors.COORDINATOR_NOT_AVAILABLE case Errors.NOT_LEADER_OR_FOLLOWER | Errors.KAFKA_STORAGE_ERROR => Errors.NOT_COORDINATOR case Errors.MESSAGE_TOO_LARGE | Errors.RECORD_LIST_TOO_LARGE | Errors.INVALID_FETCH_SIZE => Errors.INVALID_COMMIT_OFFSET_SIZE // We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to transaction verification. // They can be returned without mapping to a new error. case other => other } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1458066834 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel, val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID // call the replica manager to append messages to the replicas - replicaManager.appendRecords( + replicaManager.handleProduceAppend( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, origin = AppendOrigin.CLIENT, Review Comment: that's fair. I can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
dajac commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1457569421 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel, val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID // call the replica manager to append messages to the replicas - replicaManager.appendRecords( + replicaManager.handleProduceAppend( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, origin = AppendOrigin.CLIENT, Review Comment: nit: I wonder if we should remove the `origin` parameter from `handleProduceAppend` as it should always come from a client by definition. ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3041,31 +3007,57 @@ class ReplicaManagerTest { origin = origin, entriesPerPartition = Map(partition -> records), responseCallback = appendCallback, - transactionalId = transactionalId, ) result } - private def appendRecordsToMultipleTopics(replicaManager: ReplicaManager, -entriesToAppend: Map[TopicPartition, MemoryRecords], -transactionalId: String, -origin: AppendOrigin = AppendOrigin.CLIENT, -requiredAcks: Short = -1): CallbackResult[Map[TopicPartition, PartitionResponse]] = { + private def handleProduceAppendToMultipleTopics(replicaManager: ReplicaManager, + entriesToAppend: Map[TopicPartition, MemoryRecords], + transactionalId: String, + origin: AppendOrigin = AppendOrigin.CLIENT, + requiredAcks: Short = -1): CallbackResult[Map[TopicPartition, PartitionResponse]] = { val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]() def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = { responses.foreach( response => assertTrue(responses.get(response._1).isDefined)) result.fire(responses) } + replicaManager.handleProduceAppend( Review Comment: nit: Indentation seems off here. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal, verificationErrors: Map[TopicPartition, Errors] ): Unit = { - // Map transaction coordinator errors to known errors for the response - val convertedErrors = verificationErrors.map { case (tp, error) => -error match { - case Errors.CONCURRENT_TRANSACTIONS | -Errors.COORDINATOR_LOAD_IN_PROGRESS | -Errors.COORDINATOR_NOT_AVAILABLE | -Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS - case _ => tp -> error -} - - } Review Comment: For my understanding, we remove this here and we adds it back in handleProduceAppend and we rely on the conversion in the group coordinator. Did I get it right? In the group coordinator, we don't handle `CONCURRENT_TRANSACTIONS`, I think. I need to double check. ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2560,41 +2562,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE")) - def testMaybeVerificationErrorConversions(error: Errors): Unit = { Review Comment: Don't we need to keep this one as we still have those conversion but in a different place now? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -762,167 +763,124 @@ class ReplicaManager(val config: KafkaConfig, delayedProduceLock: Option[Lock] = None, recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, -transactionalId: String = null, -actionQueue: ActionQueue = this.defaultActionQueue): Unit = { -if (isValidRequiredAcks(requiredAcks)) { - - val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]() - val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) = -if (transactionalId == null || !config.transactionPartitionVerificationEnable) - (entriesPerPartition, Map.empty
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1443537258 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: This is updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441068240 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: Oh I see what you are saying -- basically the appendCallback should be defined so the verification errors are joined whenever the callback is defined (ideally after we get the verification errors). So the callback for verification would include the definition of the append callback. I think this could work. Time to change all the mock code 😂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441066328 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: I was considering a write where some partitions were verified and some were not. We would allow the ones that succeeded to still write and not the ones that failed verification. Not sure if we support multi-partition writes in the produce api, but it appears we do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
hachikuji commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441051073 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: This seems a little strange. If the partitions have already failed, then why do we need to pass them through `appendRecords`? I would expect instead that the caller would just join the pre-append failures with the append failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan opened a new pull request, #15087: URL: https://github.com/apache/kafka/pull/15087 I originally did some refactors in https://github.com/apache/kafka/pull/14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors: * Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords * appendRecords has been simplified * Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that * Fixed incorrect capital param name in KafkaRequestHandler * Updated ReplicaManager test to handle transaction verification appends separately. (I may revise this) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org