Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan merged PR #14629: URL: https://github.com/apache/kafka/pull/14629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800331351 Thanks @divijvaidya. I was taking a look at Gradle enterprise as well. But the link is helpful -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800218167 Hey @jolshan - You might already know this but sharing in case you don't. A tool I use to quickly find whether a test is flaky is https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=trunk=Europe%2FBerlin=kafka.zk.ZkMigrationIntegrationTest You can just replace the test class & name in the url and you can view the last 7 days results for that specific test including the flakiness rate & cause of failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800110570 @jolshan : Thanks for the analysis. If the test failures are unrelated, feel free to merge the PR. It would be useful to file jiras to track new transient test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1799582958 I took a look at the most recent 19 failures. The majority of them are failing on trunk in the last few (5 or so) runs. I will take a look at the remaining ones I didn't see failing on trunk recently. For what is is worth, the majority of trunk runs that complete for all 4 builds have 10-31 failures each. https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/ This is not good for the project. :( *[Build / JDK 8 and Scala 2.12 / org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.tiered.storage.integration/TransactionsWithTieredStoreTest/Build___JDK_8_and_Scala_2_12___testSendOffsetsToTransactionTimeout_String__quorum_zk/) Looks like an issue with InitProducerId that doesn't seem related to this change *[Build / JDK 21 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13___testConfigurationOperations__/) Issue with Quorum controller that doesn't seem related to my change. *[Build / JDK 8 and Scala 2.12 / kafka.api.ProducerIdExpirationTest.testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/kafka.api/ProducerIdExpirationTest/Build___JDK_8_and_Scala_2_12___testTransactionAfterTransactionIdExpiresButProducerIdRemains_String__quorum_zk/) I feel like I have seen a similar issue in the past, but will double check my branch to make sure nothing is wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1796388941 This one only had 4 failures from usual suspects. I will run one more time https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/17/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1383785302 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: The serialization can be done asynchronously, without holding locks and relying on synchronous execution. It looks like it's should be that hard to support that in the new group coordinator https://github.com/apache/kafka/pull/14705. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1795753522 Given that I made minimal changes and the previous tests only had 10-15 or so regular flakes, I don't think so. But I will investigate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1795726859 @jolshan : Are the 39 test failures related to the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1382262985 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,32 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * Execute or create a callback to be asynchronously scheduled on an arbitrary request thread + * NOTE: this function must be originally called from a request thread. + * @param asyncCompletionCallback A callback method that is expected to be executed once in an arbitrary request + *handler thread after an asynchronous action completes. The RequestLocal passed in + *must belong to the request handler thread that is executing the callback. + * @param requestLocal The RequestLocal for the current request handler thread in case we need to execute the callback + * function immediately without queueing the callback request + * @return Wrapped callback that schedules `asyncCompletionCallback` on an arbitrary request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def executeOrRegisterAsyncCallback[T](asyncCompletionCallback: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: Yeah. I was thinking about that when I was renaming but I forgot that in the early return case, we still wrap. I'll change the description again to match this as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1382212898 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,32 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * Execute or create a callback to be asynchronously scheduled on an arbitrary request thread + * NOTE: this function must be originally called from a request thread. + * @param asyncCompletionCallback A callback method that is expected to be executed once in an arbitrary request + *handler thread after an asynchronous action completes. The RequestLocal passed in + *must belong to the request handler thread that is executing the callback. + * @param requestLocal The RequestLocal for the current request handler thread in case we need to execute the callback + * function immediately without queueing the callback request + * @return Wrapped callback that schedules `asyncCompletionCallback` on an arbitrary request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def executeOrRegisterAsyncCallback[T](asyncCompletionCallback: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: Sorry, just realized that this method doesn't execute or register the callback directly. It creates a wrapped callback for that instead. So, `wrap` is still the more appropriate name. We could probably change the description to sth like "Creates a wrapped callback to be executed synchronously on the calling request thread or asynchronously on an arbitrary request thread." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1382176455 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) Review Comment: I've updated the PR to reflect this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1382035051 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > The ordering of the requests themselves is not guaranteed already. It's true that there is not strong ordering guarantee among different clients. However, the ordering how the group coordinator serializes them in the log seems important. Consider an example. A group coordinator receives a joinGroup request from client 1 first and generates a group record rec1=GroupA(members: client1). It then receives joinGroup request from client 2 and generates a group record rec2=GroupA(members: client1, client2). If rec1 comes after rec2 in the log and the coordinator fails over, the new coordinator will restore the group state to include only client1 as its members and lose the valid member client2. Consider another example that involves consumer offsets. A group coordinator starts with a state in which client 1 owns partition 1. It receives an offset commit request from client 1 on partition 1. This is a valid request since client 1 owns partition 1. So, it generates rec3= GroupA(offsetForPartition1=100). A rebalance happens. The coordinator re-assigns partition 1 to client 2 and generates rec4= GroupA(partition1 owned by client 2). If rec3 comes after rec4 in the log and the coordinator fails over, when the new coordinator rebuilds its state, rec3 will look like invalid since it happens when client1 no longer owns partition 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1792854022 > Just to be clear. The txn verification for consumer offset is in trunk and still needs to be fixed, right? Yes. We will need to fix for 3.7. Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-15784 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r138262 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. Review Comment: Jason commented on this too, but I couldn't find the line. I see it now and will fix it ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. Review Comment: Jason commented on this too, but I couldn't find the line. I see it now and will fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381976028 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. Review Comment: expected => is expected ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) Review Comment: Looking at this again. I am not sure AsynchronousCompletionCallback adds much value. It makes the existing subtle code even harder to understand. So, we probably could just go back to the original way, but add the comments here to the javadoc for `asyncCompletionCallback` in `wrap`. ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) + + /** + * Wrap callback to schedule it on an arbitrary request thread. + * NOTE: this function must be originally called from a request thread. + * @param asyncCompletionCallback a callback function to execute as the result of an asynchronous action completing + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request + * @return Wrapped callback that schedules `asyncCompletionCallback` on an arbitrary request thread + */ + def wrap[T](asyncCompletionCallback: AsynchronousCompletionCallback[T], requestLocal: RequestLocal): T => Unit = { Review Comment: `wrap` seems unintuitive. How about sth like `executeOrRegisterCallback`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381971423 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: My understanding from David is that the main thing we must guarantee is that we can't update the in memory-state again until the corresponding records are written to the log. From that, I thought that as long as those steps are synchronous (and I suppose we need a lock for now) that is ok. The ordering of the requests themselves is not guaranteed already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381971423 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: My understanding from David is that the main thing we must guarantee is that we can't update the state again until the corresponding records are written to the log. From that, I thought that as long as those steps are synchronous (and I suppose we need a lock for now) that is ok. The ordering of the requests themselves is not guaranteed already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. Is it true that the ordering of the log records is not important? It seems that you want the latest log to reflect the latest state, right? > I was saying that appendEntries queues up a log record and we could use it there. Yes, the only thing is whether appendEntries can queue up the log record in the right order. I am not sure simply acquiring the group lock when asynchronously calling appendEntries achieves this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. Is it true that the ordering is not important? It seems that you want the latest log to reflect the latest state, right? > I was saying that appendEntries queues up a log record and we could use it there. Yes, the only thing is whether appendEntries can queue up the log record in the right order. I am not sure simply acquiring the group lock when asynchronously calling appendEntries achieves this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. Is it true that the ordering not important? It seems that you want the latest log to reflect the latest state, right? > I was saying that appendEntries queues up a log record and we could use it there. Yes, the only thing is whether appendEntries can queue up the log record in the right order. I am not sure simply acquiring the group lock when asynchronously calling appendEntries achieves this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. Is that true? It seems that you want the latest log to reflect the latest state, right? > I was saying that appendEntries queues up a log record and we could use it there. Yes, the only thing is whether appendEntries can queue up the log record in the right order. I am not sure simply acquiring the group lock when asynchronously calling appendEntries achieves this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1792695126 It seems the offset validation is a tricky question requiring more thought, so I will file a jira for it. Given that the offset verification is not in 3.6, I think it makes sense to merge the request local fix separately. Are we all in agreement for that part of the change? cc: @dajac @junrao @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
dajac commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381380645 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I agree with @junrao here. > The expectation is that queuing up of the log record can be done synchronously. However, it does not mean that it should be written to the log. The minimum is to guarantee the ordering of the records based on the state changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
dajac commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1381378455 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: @hachikuji Yeah, that's right. I think that we could do it as follow: 1) Apply first verification (e.g. is it the right coordinator, is the member valid, etc) 2) Prepare the transaction (the async call to the transaction coordinator if needed) 3) Re-verify 4) State change + write This would reduce the likelihood of preparing the transaction for nothing but it could still happen because the state could change in between. I think that this is possible regardless of the model that we use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to an offset partition when a rebalance happens after offsets are are written as part of the transaction but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380899701 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I was saying that appendEntries queues up a log record and we could use it there. > > I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. > > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. > > Basically there are two questions I see: > > 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) > 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I was saying that appendEntries queues up a log record and we could use it there. > > I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. > > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. > > Basically there are two questions I see: > > 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) > 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380888754 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes, the current flow in group coordinator is updating the state, queuing up a corresponding log record for commit, and calling the callback when the log record is committed. It needs the log records for a group to be queued up and committed in the same order as the updating of the state so that (1) the latest state can be replayed from the log records correctly in case of failure and (2) the callback can be called in the updating order. Currently, this is achieved through holding the group lock during updating the state and queuing up of the log record (implemented through `ReplicaManager.append`). The expectation is that queuing up of the log record can be done synchronously. Adding async txn validation dependency inside `ReplicaManager.append` breaks that expectation. To address this issue, we could move the async txn validation dependency out of `ReplicaManager.append`. Alternatively, we could implement another way of synchronously queuing up a log record since `ReplicaManager.append` does more than just queuing up a record. I am not sure holding the group lock just for updating the state is enough for ensuring the ordering of the log records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That makes sense to me. So maybe the verification would go through a different flow where the callback would include both the state update and the write via appendEntries. The main question I have is whether we should consider this as a separate JIRA and do in a follow-up PR. I am inclined to go with this option. ~(But do it ASAP for 3.6.1)~ We don't need this for 3.6.1 since the offsets are not in 3.6.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380813788 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Hmm, I think it would break the abstraction of the appendRecords being a fundamentally asynchronous call, i.e. it has the syntax of being asynchronous (i.e. completion callback is surely called in a different thread) but still a concrete implementation expects that a specific part of the call is synchronous, but the code has no indication which one (maybe some day we'd use async IO and again make it more asynchronous then the code around it would expect). Like why would the group coordinator need to know about transaction verification flow which has it's own trickiness with verification guard and may actually (likely) be different for KIP-890 part2? I think it would be easier for separation of concerns if the lock is held only around im-memory state updates and just transition the state into a "pending state", so that the new updated would be queued or failed (with a retriable error) until async call is done. Then we wouldn't have a situation where we have a lock with a syntax that looks like "lock around in-memory update + call", but with the desired semantics "[lock] [in-memory update] [some part of call] [unlock] [some other part of call]" and the place of [unlock] is fairly deep in logic that has no indication that by doing an unrelated action (making the code as asynchronous as need to avoid holding threads) it could actually break the desired semantics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That makes sense to me. So maybe the verification would go through a different flow where the callback would include both the state update and the write via appendEntries. The main question I have is whether we should consider this as a separate JIRA and do in a follow-up PR. I am inclined to go with this option. ~(But do it ASAP for 3.6.1)~ We don't need this since the offsets are not in 3.6.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: This is effectively what we are doing right? appendEntries is just replica manager append w/o further txn validation. But maybe I misunderstood the suggestion since we still need the callback logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: This is effectively what we are doing right? appendEntries is just replica manager append w/o further txn validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380724965 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I guess that proposal is that when serving an `OffsetCommit` request, KafkaAPIs first determines the partitions for txn validation. It then initiates the txn validation asychnronously. Once the validation is done, it calls `ReplicaManager.append` w/o further txn validation. If we do this, I am wondering if we should just do the same for regular produce requests too. This way, we don't need the callback logic for txn validation in `ReplicaManager.append`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
hachikuji commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380634456 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: @dajac Yes, that is similar to what I had in mind. We need to reacquire the lock after validation in order to verify whether the write should still be accepted. And we still need to protect the write itself. Your approach seems like a simpler implementation of this. The only downside I can think of is that we may add the the partition to the transaction but still ultimately discard the write, but this is probably not a common case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That makes sense to me. So maybe the verification would go through a different flow where the callback would include both the state update and the write via appendEntries. The main question I have is whether we should consider this as a separate JIRA and do in a follow-up PR. I am inclined to go with this option. (But do it ASAP for 3.6.1). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
dajac commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379784888 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Sorry, I am late here. I actually faced a similar issue while implementing this API in the new group coordinator. The scenario is more or less the same. We need to guarantee that the update to the state machine and to the log are done together to guarantee the ordering of the changes. Given this, I wonder if we should just change how we do it. I was about to re-implement this part to work as follow: 1) Verify the transaction and possibly go the transaction coordinator; 2) Execute the actual transactional offset commit logic (update state + write). I think that this is easier than introducing extra bookkeeping to track dependencies. What do you think? Note that I need to change it anyway for the new coordinator. One way would be to bring this logic to the KafkaApis layer or we could rework the write logic within the group coordinator. Would it make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379416743 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Given this is separate to the requestLocal issue, does it make sense to file a JIRA and fix there? We can still consider it for a patch release if we think this is a regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
hachikuji commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379404877 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: @junrao If the lock cannot protect the write, then the validation in the group coordinator cannot be relied on. In principle, it would be possible for an older offset commit to override a newer one for example. A more likely scenario might be for a group to rebalance while an offset commit is awaiting validation. This would allow a fenced consumer to be able to commit offsets. Without the lock, I think we'd have to introduce additional bookkeeping to track dependencies between inflight writes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379066906 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I think it's more a concern about group changes that are made while the verification is ongoing. Offline we were discussing what would happen if we bumped an epoch/rebalance for example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1379034752 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: @hachikuji : I am wondering the reason for the group coordinator to hold a group lock while appending to the log. It seems it's just to make sure the group state checking/updating and the appending to the log can be done atomically. I am not sure if there is any ordering guarantee since requests from different clients could come in arbitrary order. Is that the case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1378272592 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], +errorsPerPartition: Map[TopicPartition, Errors], +sTime: Long, +recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit, +timeout: Long, +responseCallback: Map[TopicPartition, PartitionResponse] => Unit, +delayedProduceLock: Option[Lock]) + (requestLocal: RequestLocal) Review Comment: Jason helped me with Scala offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1378267516 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], +errorsPerPartition: Map[TopicPartition, Errors], +sTime: Long, +recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit, +timeout: Long, +responseCallback: Map[TopicPartition, PartitionResponse] => Unit, +delayedProduceLock: Option[Lock]) + (requestLocal: RequestLocal) Review Comment: Hmm I'm not sure I understand. we need these arguments in the method right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1378225151 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) Review Comment: Jun wanted the explicit class so folks would see the comment about the request local etc. We could do a case class otherwise -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
hachikuji commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1378179666 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. Review Comment: nit: **is** expected? ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) Review Comment: nit: could probably be case class? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], +errorsPerPartition: Map[TopicPartition, Errors], +sTime: Long, +recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit, +timeout: Long, +responseCallback: Map[TopicPartition, PartitionResponse] => Unit, +delayedProduceLock: Option[Lock]) + (requestLocal: RequestLocal) Review Comment: nit: probably simpler to return `(request: RequestLocal, unverifiedEntries: Map[TopicPartition, Errors])`. Then we don't need the awkward `(_)(_)` in the call above ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -50,28 +50,36 @@ object KafkaRequestHandler { } /** - * Wrap callback to schedule it on a request thread. - * NOTE: this function must be called on a request thread. - * @param fun Callback function to execute - * @return Wrapped callback that would execute `fun` on a request thread + * A callback method that expected to be executed once in an arbitrary request handler thread after an asynchronous action completes. + * The RequestLocal passed in must belong to the request handler thread that is executing the callback. */ - def wrap[T](fun: T => Unit): T => Unit = { + class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) + + /** + * Wrap callback to schedule it on an abitrary request thread. Review Comment: nit: typo arbitrary ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Hmm, without the lock, an offset commit may be reordered after a group state change. To make it safe, we would need to reacquire the group lock after validation completes so that we could check again whether the request should be accepted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866891 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: https://issues.apache.org/jira/browse/KAFKA-15758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866596 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: I understand the issue. I have a commit below that clears the request local. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376821250 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: There is a verification state for the doCommitTxnOffset path which is mostly the same path (execept for using storeOffsets instead of storeGroup) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376817191 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: It seems that storing the group coordinator state is not transactional, so verification code path shouldn't be used. But I agree with question of the locking model in the group coordinator and whether it'd work correctly with more asynchronous approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376679750 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I am saying that before the append was under the lock. With the verification change, now it is not. I think we are saying the same thing here. We need to figure out if the append not under the lock is safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376678541 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: I we reduce then I assume the thread just stops? ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: If we reduce then I assume the thread just stops? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376674826 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > It would just be the append itself that is not under the lock. Hmm, is that true? We have the following code. ``` GroupCoordinator.doSyncGroup group.inLock { ... groupManager.storeGroup ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376671529 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Not sure how strongly we need the `GroupMetadata` lock while appending to the log. But I think the intention is to make sure that all writes to the log for a group are serialized. Updating the in-memory state typically happens in the callback when the HWM advances and is also protected by the `GroupMetadata` lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376670972 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: It seems that we do have the lock for the in-memory state. It would just be the append itself that is not under the lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376662251 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I am wondering about the locking model. We have the following path that's called under the GroupMetadata lock. GroupCoordinator.doSyncGroup (hold GroupMetadata lock) => groupManager.storeGroup => appendForGroup => replicaManager.appendRecords => replicaManager.appendEntries. However, if we register the callback, replicaManager.appendEntries will be called without holding the GroupMetadata lock. Is that safe? Hmm. Do we need the lock to write to log or just to updaet the in-memory state. I can take another look at this path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376661650 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: True in the common case. I was wondering what happens if we dynamically reduce the number of request handler threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376659689 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Hmm. We assign the request channel when the thread is created. I assume that request channel will remain with the thread during its lifetime. Is that incorrect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376657322 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Thanks, Justine and Artem. Yes, passing around the context has it's own issues. The main thing with thread local is to make sure that we don't introduce any GC issues. Currently, it seems that we never remove `threadRequestChannel`. Since we allow dynamically changing the number of request handler threads, it's probably better to remove `threadRequestChannel` when the thread completes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376654240 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > actionable practical recommendation I can't think of an idea other than passing deep copy of arguments. I have also considered suggesting wrapping in Collections.unmodifiable or Collections.synchronizedCollection but none of these ideas are good enough. We need to re-think callback invocation pattern in Kafka and see if we can avoid this pattern of passing reference of in-memory state around threads. But I am happy to keep it our of this PR and discuss that in scope of JIRA that Ismael created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376651533 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: 1. On the server side, some of the callbacks (Purgatory, the one introduced in KAFKA-14561, etc) are called in a different thread than the caller. I am not sure if it's possible/desirable to change all of those callbacks to be run on the same thread. Given that model, I agree that the wide exposure of ThreadLocal seems potentially dangerous. I pinged https://github.com/apache/kafka/pull/9220 to see if we could consider an alternative approach. It might be easier to fix the `ThreadLocal` thing first before fixing this issue. 2. In general, callbacks on different threads are tricky to get right. For example, we spend a lot of time to fix the deadlock issues related to Purgatory (https://issues.apache.org/jira/browse/KAFKA-8334). For this newly introduced callback, I am wondering about the locking model. We have the following path that's called under the GroupMetadata lock. `GroupCoordinator.doSyncGroup (hold GroupMetadata lock) => groupManager.storeGroup => appendForGroup => replicaManager.appendRecords => replicaManager.appendEntries`. However, if we register the callback, `replicaManager.appendEntries` will be called without holding the `GroupMetadata` lock. Is that safe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376507019 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: (Race condition -- my reply was to Ismael w.r.t. "more care required"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376502780 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I agree, it's just not exactly clear what an actionable practical recommendation could be derived from that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376481605 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Thanks for the discussion here folks. My point about the immutability of the map is that we can't put new objects in the map and the values appeared to be final. I agree that the method itself (and the argument for wrap) doesn't guarantee immutability and/or thread safety. I think we can agree the the RequestLocal object provides specific challenges for callbacks that require it. Specifically callbacks that may execute on a different thread may see issues. How do we want to move forward from here? I can modify my comment and the class to remove the comments about thread safety etc and focus on the fact that the method may execute on a different thread. (ie name the class something that reflects that) Were there any other changes we think are necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
ijuma commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375357830 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I think you misunderstood my point. I was saying that there are two types of callbacks: (1) execute on the same thread (2) execute on a different thread. They're not the same and the latter require more care. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375355538 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I think how much work is done in the callback is more a quantitative difference rather than a qualitative difference -- how much work needs to be done to treat it as a new pattern that requires some special care (and what is that care, can we qualify)?I agree that if the functionality in the callback just happened to be after the point of use of the RequestLocal then we wouldn't hit the problem (even if we captured it in the callback), but it seems to a matter of just random "luck". I don't think "don't be unlucky" is a useful engineering guidance. On the other hand, saying "don't pass thread local as an argument, access it at the point of use as a thread local instead" would avoid these cases. But maybe the first point to discuss is do we agree that the root cause is the fact that RequestLocal is bound to the executing thread context (effectively thread local passed as an argument) and it's not a typical pattern in Java or do we think that it's something else? At some point I think there was a theory that the machinery for re-scheduling the callback on the request thread pool could be a culprit, but I think we all agree, that the problem would be the same even if the callback was executed directly on the network thread (or pretty much in any other way other than trying to schedule it on the exact same thread, which is doable, btw, but would hinder performance). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
ijuma commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375332013 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: A lot of callback code runs on the same thread though - the cases where a different thread is used require more care and have caused a bunch of pain in the past. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375329122 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That's correct -- we don't create new concurrent access, i.e. this callback is not called on multiple threads at the same time, all we do here is potentially executing this callback on a different thread. Actually I think "Accessing non-thread-safe data structures should be avoided if possible" comment is misleading here, the only thing that we have to avoid is capturing thread local (or effectively thread local) objects in the callback. Other objects can be as thread safe as they would be if this method was called directly on this thread (i.e. if an object was rooted on the call stack it doesn't have to be thread safe because it's not going to be used concurrently from multiple threads). BTW, we have plenty of cases in Kafka (both client and server) where we pass a callback to continue execution of a single-threaded logical task after an RPC call is complete (most likely on a different thread), so I don't think we're dealing with unique pattern here. The only thing that is unique (and tricky) here is the RequestLocal that disguises as a context of a logical task (arguments are generally are logical task context), but it is actually a concrete physical thread context that would be different (or missing completely, say if we executed this callback on the network thread) if we continued execution of this logical task on a different thread. I've already pointed out elsewhere that (in a future change) we should probably remove the RequestLocal from the argument list and make it a thread-local (which it effectively is) and just use it directly where it's needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
ijuma commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375263740 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Actually, I don't think this method enforces that these maps are immutable. That said, my understanding is that we basically suspend processing and hence there is only one thread accessing these structures at a time. Is that correct? If so, the main thing we need to do is ensure we safely publish these data structures, they don't have to be immutable necessarily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
ijuma commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375262871 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Can we file a JIRA for investigating the alternative? I agree with @junrao that it would be much simpler to reason about if we always schedule it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375078778 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Yeah, the hope is that we'd see more async patterns as we evolve Kafka, and the way it's currently implemented, we just say "execute callback on the _current_ thread pool", which is why the thread local is used -- this way we can implement a non-blocking wait-and-continue-on-current-thread-pool from any level without passing additional arguments through the whole stack. The same functionality could be implemented on other thread pools, e.g. if the group coordinator has its own thread pool, we could implement the same primitive there as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375061797 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: As for just produce. Yes, this is the case now, but I think Artem was trying to create the wrap method as a "general" callback mechanism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375060752 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: We pass the wrapped method which is a simpler type. `type AppendCallback = Map[TopicPartition, Errors] => Unit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375057023 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: > Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for. Currently, the callback is only needed for produce request in ReplicaManager. If you look at `KafkaApis.handleProduceRequest`, we already pass in both request channel and current request to `ReplicaManager.appendRecords` through `sendResponseCallback`. > As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing. Yes. Currently, we already pass in `appendEntries` as a callback to `AddPartitionsToTxnManager`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1783521458 Following up with build failure here: https://github.com/apache/kafka/pull/14545#issuecomment-1783515553 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375044842 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for. As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375025398 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Ismael mentioned in https://github.com/apache/kafka/pull/9229#issuecomment-683352094 that thread locals are most useful when one doesn't control the code. So, I am wondering if we could get rid of the two ThreadLocal: `threadRequestChannel` and `threadCurrentRequest` in `KafkaRequestHandler` introduced in KAFKA-14561. The reason for the former is to obtain requestChannel. We could simply pass in requestChannel to `ReplicaManager.appendRecords`. The reason for the latter is (1) to obtain currentRequest and (2) to make sure that the callback can be short-circuited if it's called on the same request handler thread. For (1), we could also pass in currentRequest to `ReplicaManager.appendRecords`. For (2), we could change the code such that `KafkaRequestHandler.wrap` is called only when the callback truly needs to be run from a different thread. Otherwise, we can just call the callback directly there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375016328 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: I think ideally in a future change we'd get rid of passing RequestLocal as an argument and maybe make it a static thread local that could be accessed from the point of use rather than being passed through the whole stack. There are couple problems that contributed to this issue: - the functions here are already written for asynchronous completion (because we wait for replication) and in such cases generally the convention is that the arguments of a function are not bound to the executing thread (i.e. rooted on the call stack or globally) - the point of use was outside of the core change so folks didn't look into the specifics of the RequestLocal semantics (i.e. even if appendEntries was an explicit function as it now is, I'm not sure if the problem had been noticed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1374844889 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: That seems reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1374839084 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: Since this is a generic util, it would be useful for future users of this method to understand what kind of `fun` could be passed in. I was thinking that perhaps we could make `fun` of a new interface (sth like `ThreadSafeCallback`). We can document the expectation of `ThreadSafeCallback` (e.g., only executed once, but could be in an arbitrary request handler thread; if an implementation needs to access a share data structure, the access needs to be thread safe, etc). Then, anyone who uses this method in the future needs to be aware of this interface. This may address a bit of Divij's concern. Is that worth doing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373793336 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes. I'm having trouble speaking today. Map in scala is immutable. 臘♀️ Apologies for confusion. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes. I'm having trouble speaking today. Map in scala is immutable. 臘♀️ Apologies for confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373548000 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > Sorry for my phrasing. I was saying they are mutable. Did you mean to say immutable here? If they are mutable, then we need to ensure that they are not concurrently mutated in different threads. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > Sorry for my phrasing. I was saying they are mutable. Did you mean to say immutable here? If they are mutable, then we need to ensure that they are not concurrently mutated in different threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373532105 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Sorry for my phrasing. I was saying they are mutable. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Sorry for my phrasing. I was saying they are mutable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373526592 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > These maps are not mutable right? Don't know. I haven't really checked. I wanted to ask you instead if you have verified that :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1373475149 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: These maps are not mutable right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372844362 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: 1. Are we sure that the Maps being passed here are not concurrently modified by two threads? Asking because they are not thread safe (they aren't concurrent hash maps). I haven't looked in details about the params passed here so please feel free to say that you have already verified this. 2. How can we prevent future bugs where someone mutates one of these parameters in perhaps other parts of the code such as Kafka APIs where appendEntries is called from without knowing that all these data structures are supposed to be thread safe? One option to prevent future bugs could be to pass a deep copy of these objects to the callback (instead of a copy of the reference). Another thing we can do is to restrict the number of parameters required by the callback. Having said that, have we already considered not having this as a callback? I understand that it has drawbacks since it is in critical produce path but maybe that would be acceptable given the complexity of ensuring thread safety here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372844362 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: 1. Are we sure that the Maps being passed here are not concurrently modified by two threads? Asking because they are not thread safe (they aren't concurrent hash maps). I haven't looked in details about the params passed here so please feel free to say that you have already verified this. 2. How can we prevent future bugs where someone mutates one of these parameters in perhaps other parts of the code such as Kafka APIs where appendEntries is called from without knowing that all these data structures are supposed to be thread safe? One option to prevent future bugs could be to pass a deep copy of these objects to the callback (instead of a copy of the reference). Another thing we can do is to restrict the number of parameters required by the callback. Having said that, have we already considered not having this as a callback and instead appending this synchronously? I understand that it has drawbacks since it is in critical produce path but maybe that would be acceptable given the complexity of ensuring thread safety here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414818 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: Could we update the javadoc for the new param? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414239 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Thanks for the explanation, Justine and Artem. This sounds good then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372379141 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: My understanding is that concurrency is not a problem in production logic (i.e. the fact that callback might actually execute at any time before or after or concurrently with the passing thread), but unit tests rely on deterministic order of execution and so adding concurrency here makes them "flaky". I agree in principle that if we could fold this logic into just one case (i.e. always schedule on a new request thread), it would make it simpler; when this change was proposed I looked into unit test fix and seemed more involved than adding this logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372377851 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Sorry maybe I'm not explaining myself well. We saw problems before when the callback was scheduled before the request returned. We fixed it by adding this check. If we want to remove this check, we will need to do further investigation for that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372376795 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure the that correct RequestLocal Review Comment: Ensure the that => Ensure that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372366141 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: So, is this still a problem that we need to address in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372329928 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: That seemed to cause problems when David moved changed the code to do the callback before we finished processing the request. We never figured out the root cause though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372322049 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Thanks, Justine. It seems that we can't easily get rid of the ThreadLocal stuff. So, we can leave this as it is. > I am concerned that executing the callback and before we return from the request thread causes issues. With the current PR, it seems that it's possible for the callback to be called before the request handler thread finishes processing the request (that generates the callback), right? Is that causing any problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372308747 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig, producerId = batchInfo.producerId, producerEpoch = batchInfo.producerEpoch, topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq, - callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_)) + callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, Review Comment: Thanks, Justine and Artem. Agreed that the alternative approach has its own drawback. So, we could punt on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372270435 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig, producerId = batchInfo.producerId, producerEpoch = batchInfo.producerEpoch, topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq, - callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_)) + callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, Review Comment: I think the way it's implemented right know is that we're just continuing processing from the current point after the network operation has completed, so as far as the "application" logic is concerned, the machinery to resume the execution on the request pool is hidden from the "application" logic. This allows to do an arbitrary number of non-blocking waits with minimal disruption of the application logic flow (the change is localized in the point of non-blocking wait). In this case we could inject a new produce request (if I understand your question correctly) with some context saying that it's "verified", but I think it'd be more intrusive as we'd need to implement some produce-specific "verified" context (that would have to include verification guard and whatnot) and pass it down through all functions. We'd also need to consider the offset commit path, which is a separate path that eventually comes to this function. Also, if we consider a more general functionality (we don't have any, but hopefully, if we make Kafka more async there will be more cases), generally we'd want to preserve the progress that has been made until the network call, so if a request would need to work work XYZ and we have non-blocking network calls between stages, it would be good that the overall work would be X [RPC] Y [RPC] Z, rather than X [RPC] XY [RPC] XYZ. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Right -- we had that before and it caused issues @dajac has more context. I am concerned that executing the callback and before we return from the request thread causes issues. In addition, for tests, we would need to set up the request channel etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: Right -- we had that before and it caused issues @dajac has more context. In addition, for tests, we would need to set up the request channel etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org