Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac merged PR #15142: URL: https://github.com/apache/kafka/pull/15142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1448416633 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
artemlivshits commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r144830 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: My understanding is that by the time we come to this function it already runs on a GC thread (it has to happen because that's how we guarantee the atomicity), so there is no request local anyway here and we must use NoCaching (it is always safe to use NoCaching, just not as optimal as thread local). The future's callback will be called in the thread that completed it, but like I said, it doesn't matter for this function as it gets rescheduled on the GC thread pool. I gave a suggestion to hoist wrapping to the caller, so new GC doesn't have to do double-schedule https://github.com/apache/kafka/pull/14774#discussion_r1420931474. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1448027564 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: I guess we just complete the future in the callback and do the write in the thenCompose. That made me realize I don't know everything about how futures are scheduled, but looking at the docs I think it is safe to say this is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447988893 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Ok -- sorry I just meant I didn't know if it was passed through a callback (or future in this case) or if it was only invoked in the callback. For example, there were issues in the past since we used the buffer defined for the callback on a new handler thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447918075 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: sorry, missed it. i think that i will just replace those docs with links to the interface doc in order to avoid this in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447916841 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Yes, we reschedule to request thread to only push an event to the coordinator thread. It is not optimal. The request local is not used at all as the writer uses its own buffers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447883700 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: nit: did we want to update this comment to be consistent with the PartitionWriter one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447880085 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: If it is the case its only an optimization question rather than a correctness one, I am good to tackle in a followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447879037 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Sorry -- you are correct. I was confused because we have a check for if we execute on the same request thread -- there we don't reschedule/wrap the request. (This was added when we error before we send the request to the txn coordinator) Just for my understanding, right now we reschedule to the request thread, and that works fine -- the only concern is not using the allocated coordinator threads and taking up space on the request handler threads? Just triple checking we won't run into an issue with request locals if the callback expects the buffer supplier from the coordinator thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447873603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) { * @return The Errors instance associated with the given exception. */ private static Errors normalizeException(Throwable exception) { +exception = Errors.maybeUnwrapException(exception); Review Comment: I see. It was too elegant before 😂 Thanks for looking into it and fixing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on PR #15142: URL: https://github.com/apache/kafka/pull/15142#issuecomment-1884851533 @jolshan Thanks for your comments. I have addressed all of them expect the one about the thread re-scheduling. I propose to tackle this separately in order to keep this patch simple. I have also enabled the transaction system tests and ran them successfully. The results are [here](https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1704886118--dajac--KAFKA-14505-4--8535113ae0/2024-01-10--001./2024-01-10--001./report.html). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447125608 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) { * @return The Errors instance associated with the given exception. */ private static Errors normalizeException(Throwable exception) { +exception = Errors.maybeUnwrapException(exception); Review Comment: I found out that the `thenCompose` used in the runtime wraps the exception so we need to unwrap it here. I added a test for this too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447124222 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: > Right now though, wrap only schedules to the request thread if we are already on a request thread. Otherwise we execute directly. If we start verification from a non-request handler thread, maybe this already works as you intend. My understanding is that `wrapAsyncCallback` will throw an exception if the caller is not a request thread and `bypassThreadCheck` is not set. In my case, the verification is called from the request thread. I only re-schedule to the coordinator thread when the verification returns. I could also have scheduled the verification from the coordinator thread but it does not seem necessary. > Alternatively, I could pass in a parameter to optionally wrap the callback (send it to the request thread) or not. Yeah, that's an option. I played a bit with this idea this morning and the outcome was a bit confusing. Another option would be to do the re-scheduling on the caller side. I will continue to play... I think that we could address this in a subsequent PR to get rid of the extra hop. This PR works as-is but is suboptimal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447050169 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + +/** + * Verify the transaction. + * + * @param tpThe partition to write records to. + * @param transactionalId The transactional id. + * @param producerIdThe producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446688518 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Yes -- that part has not changed. We "wrap" any callback from the transaction coordinator to the request handler thread. Right now though, wrap only schedules to the request thread if we are already on a request thread. Otherwise we execute directly. If we start verification from a non-request handler thread, maybe this already works as you intend. Alternatively, I could pass in a parameter to optionally wrap the callback (send it to the request thread) or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446663978 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: btw, when the validation completed, we schedule an event on the coordinator’s thread pool so we don’t really need to execute it on the request handler thread. with your refactor, do you plan to keep the re-scheduling within the replica manager? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446657520 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Thanks! I will try to run through it today too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446654869 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: ah, right. i forgot about that one. let me check this tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446538966 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + +/** + * Verify the transaction. + * + * @param tpThe partition to write records to. + * @param transactionalId The transactional id. + * @param producerIdThe producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. Review Comment: There is one case where I think we still throw an error even if the partition is already verified. When we look up the partition to see if it needs verification, we could throw an error if the partition isn't on the broker. Not a huge deal though and we probably don't need to include. But an alternate description could be something like it returns any error encountered or the verification guard if it needed verification and the sentinel if it did not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446536290 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Right right, I remember this now. I'm wondering though is that ok with the async verification callback. Is it the case we don't evaluate which one to use until we execute the callback. As an aside, did we decide that the callback is ok to execute on the request handler thread? I didn't go through and trace the threads being used yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446091865 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( Review Comment: that's how i understand it too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446091535 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + +/** + * Verify the transaction. + * + * @param tpThe partition to write records to. + * @param transactionalId The transactional id. + * @param producerIdThe producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. Review Comment: i extended the description. let me know if it works for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446088963 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: no, we don't have request locals here. the writer uses its own butter supplier (`threadLocalBufferSupplier`) so the `requestLocal` is not used at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446089431 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1466,17 +1477,25 @@ public CompletableFuture scheduleTransactionalWriteOperation( ) { throwIfNotRunning(); log.debug("Scheduled execution of transactional write operation {}.", name); -CoordinatorWriteEvent event = new CoordinatorWriteEvent<>( -name, +return partitionWriter.maybeStartTransactionVerification( tp, transactionalId, producerId, -producerEpoch, -timeout, -op -); -enqueue(event); -return event.future; +producerEpoch +).thenCompose(verificationGuard -> { Review Comment: yeah, i am also happy with this. it is simple and elegant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1445444996 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( Review Comment: One thing to note is that we don't always verify the partition if it has already been verified for the transaction. I guess in that case we return the sentinel guard which works 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1445431424 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + +/** + * Verify the transaction. + * + * @param tpThe partition to write records to. + * @param transactionalId The transactional id. + * @param producerIdThe producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. Review Comment: nit: we could have a verification guard without verifying the transaction -- (how the method in ReplicaManager works) so maybe we should include that we only return the verification guard and not an error if it was verified by the transaction coordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1445426296 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1466,17 +1477,25 @@ public CompletableFuture scheduleTransactionalWriteOperation( ) { throwIfNotRunning(); log.debug("Scheduled execution of transactional write operation {}.", name); -CoordinatorWriteEvent event = new CoordinatorWriteEvent<>( -name, +return partitionWriter.maybeStartTransactionVerification( tp, transactionalId, producerId, -producerEpoch, -timeout, -op -); -enqueue(event); -return event.future; +producerEpoch +).thenCompose(verificationGuard -> { Review Comment: I'm glad this ended up being kind of elegant :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1445415307 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: do we not typically use request locals for this type of append? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1445412684 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -989,6 +989,7 @@ class ReplicaManager(val config: KafkaConfig, * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the * thread calling this method * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param actionQueue the action queue to use Review Comment: good news is the refactor will allow specifying the action queue as normal 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on PR #15142: URL: https://github.com/apache/kafka/pull/15142#issuecomment-1881648674 Yup -- sorry was hoping to do all the clean ups before you got to this -- but shouldn't be too hard to update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac opened a new pull request, #15142: URL: https://github.com/apache/kafka/pull/15142 This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller. Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when https://github.com/apache/kafka/pull/15087 is merged. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org