Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-07 Thread via GitHub


jolshan merged PR #14629:
URL: https://github.com/apache/kafka/pull/14629


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-07 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800331351

   Thanks @divijvaidya. I was taking a look at Gradle enterprise as well. But 
the link is helpful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-07 Thread via GitHub


divijvaidya commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800218167

   Hey @jolshan - You might already know this but sharing in case you don't. A 
tool I use to quickly find whether a test is flaky is 
https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=trunk=Europe%2FBerlin=kafka.zk.ZkMigrationIntegrationTest
 You can just replace the test class & name in the url and you can view the 
last 7 days results for that specific test including the flakiness rate & cause 
of failures. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-07 Thread via GitHub


junrao commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1800110570

   @jolshan : Thanks for the analysis. If the test failures are unrelated, feel 
free to merge the PR. It would be useful to file jiras to track new transient 
test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-07 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1799582958

   I took a look at the most recent 19 failures. The majority of them are 
failing on trunk in the last few (5 or so) runs. 
   I will take a look at the remaining ones I didn't see failing on trunk 
recently. For what is is worth, the majority of trunk runs that complete for 
all 4 builds have 10-31 failures each. 
https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/ 
   This is not good for the project. :(   
   
   *[Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.tiered.storage.integration/TransactionsWithTieredStoreTest/Build___JDK_8_and_Scala_2_12___testSendOffsetsToTransactionTimeout_String__quorum_zk/)
  
   
   Looks like an issue with InitProducerId that doesn't seem related to this 
change
   
   *[Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13___testConfigurationOperations__/)
  
   
   Issue with Quorum controller that doesn't seem related to my change. 
   
   *[Build / JDK 8 and Scala 2.12 / 
kafka.api.ProducerIdExpirationTest.testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/kafka.api/ProducerIdExpirationTest/Build___JDK_8_and_Scala_2_12___testTransactionAfterTransactionIdExpiresButProducerIdRemains_String__quorum_zk/)
   
   I feel like I have seen a similar issue in the past, but will double check 
my branch to make sure nothing is wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-06 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1796388941

   This one only had 4 failures from usual suspects. I will run one more time
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/17/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-06 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1383785302


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   The serialization can be done asynchronously, without holding locks and 
relying on synchronous execution.  It looks like it's should be that hard to 
support that in the new group coordinator 
https://github.com/apache/kafka/pull/14705.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-06 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1795753522

   Given that I made minimal changes and the previous tests only had 10-15 or 
so regular flakes, I don't think so. 
   But I will investigate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-06 Thread via GitHub


junrao commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1795726859

   @jolshan : Are the 39 test failures related to the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1382262985


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,32 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * Execute or create a callback to be asynchronously scheduled on an 
arbitrary request thread
+   * NOTE: this function must be originally called from a request thread.
+   * @param asyncCompletionCallback A callback method that is expected to be 
executed once in an arbitrary request
+   *handler thread after an asynchronous 
action completes. The RequestLocal passed in
+   *must belong to the request handler thread 
that is executing the callback.
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to execute the callback
+   * function immediately without queueing the callback 
request
+   * @return Wrapped callback that schedules `asyncCompletionCallback` on an 
arbitrary request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def executeOrRegisterAsyncCallback[T](asyncCompletionCallback: 
(RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = {

Review Comment:
   Yeah. I was thinking about that when I was renaming but I forgot that in the 
early return case, we still wrap. I'll change the description again to match 
this as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1382212898


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,32 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * Execute or create a callback to be asynchronously scheduled on an 
arbitrary request thread
+   * NOTE: this function must be originally called from a request thread.
+   * @param asyncCompletionCallback A callback method that is expected to be 
executed once in an arbitrary request
+   *handler thread after an asynchronous 
action completes. The RequestLocal passed in
+   *must belong to the request handler thread 
that is executing the callback.
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to execute the callback
+   * function immediately without queueing the callback 
request
+   * @return Wrapped callback that schedules `asyncCompletionCallback` on an 
arbitrary request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def executeOrRegisterAsyncCallback[T](asyncCompletionCallback: 
(RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = {

Review Comment:
   Sorry, just realized that this method doesn't execute or register the 
callback directly. It creates a wrapped callback for that instead. So, `wrap` 
is still the more appropriate name. We could probably change the description to 
sth like "Creates a wrapped callback to be executed synchronously on the 
calling request thread or asynchronously on an arbitrary request thread."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1382176455


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   I've updated the PR to reflect this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1382035051


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > The ordering of the requests themselves is not guaranteed already.
   
   It's true that there is not strong ordering guarantee among different 
clients. However, the ordering how the group coordinator serializes them in the 
log seems important. 
   
   Consider an example. A group coordinator receives a joinGroup request from 
client 1 first and generates a group record rec1=GroupA(members: client1). It 
then receives joinGroup request from client 2 and generates a group record 
rec2=GroupA(members: client1, client2). If rec1 comes after rec2 in the log and 
the coordinator fails over, the new coordinator will restore the group state to 
include only client1 as its members and lose the valid member client2.
   
   Consider another example that involves consumer offsets. A group coordinator 
starts with a state in which client 1 owns partition 1. It receives an offset 
commit request from client 1 on partition 1. This is a valid request since 
client 1 owns partition 1. So, it generates rec3= 
GroupA(offsetForPartition1=100). A rebalance happens. The coordinator 
re-assigns partition 1 to client 2 and generates rec4= GroupA(partition1 owned 
by client 2). If rec3 comes after rec4 in the log and the coordinator fails 
over, when the new coordinator rebuilds its state, rec3 will look like invalid 
since it happens when client1 no longer owns partition 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1792854022

   > Just to be clear. The txn verification for consumer offset is in trunk and 
still needs to be fixed, right?
   
   Yes. We will need to fix for 3.7. Here is the JIRA: 
https://issues.apache.org/jira/browse/KAFKA-15784


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r138262


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   Jason commented on this too, but I couldn't find the line. I see it now and 
will fix it  



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   Jason commented on this too, but I couldn't find the line. I see it now and 
will fix it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381976028


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   expected => is expected



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   Looking at this again. I am not sure AsynchronousCompletionCallback adds 
much value. It makes the existing subtle code even harder to understand. So, we 
probably could just go back to the original way, but add the comments here to 
the javadoc for `asyncCompletionCallback` in `wrap`. 



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
+
+  /**
+   * Wrap callback to schedule it on an arbitrary request thread.
+   * NOTE: this function must be originally called from a request thread.
+   * @param asyncCompletionCallback a callback function to execute as the 
result of an asynchronous action completing
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
+   * @return Wrapped callback that schedules `asyncCompletionCallback` on an 
arbitrary request thread
+   */
+  def wrap[T](asyncCompletionCallback: AsynchronousCompletionCallback[T], 
requestLocal: RequestLocal): T => Unit = {

Review Comment:
   `wrap` seems unintuitive. How about sth like `executeOrRegisterCallback`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381971423


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   My understanding from David is that the main thing we must guarantee is that 
we can't update the in memory-state again until the corresponding records are 
written to the log. From that, I thought that as long as those steps are 
synchronous (and I suppose we need a lock for now) that is ok.
   
   The ordering of the requests themselves is not guaranteed already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381971423


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   My understanding from David is that the main thing we must guarantee is that 
we can't update the state again until the corresponding records are written to 
the log. From that, I thought that as long as those steps are synchronous (and 
I suppose we need a lock for now) that is ok.
   
   The ordering of the requests themselves is not guaranteed already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data.
   
   Is it true that the ordering of the log records is not important? It seems 
that you want the latest log to reflect the latest state, right?
   
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   
   Yes, the only thing is whether appendEntries can queue up the log record in 
the right order. I am not sure simply acquiring the group lock when 
asynchronously calling appendEntries achieves this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data.
   
   Is it true that the ordering is not important? It seems that you want the 
latest log to reflect the latest state, right?
   
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   
   Yes, the only thing is whether appendEntries can queue up the log record in 
the right order. I am not sure simply acquiring the group lock when 
asynchronously calling appendEntries achieves this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data.
   
   Is it true that the ordering not important? It seems that you want the 
latest log to reflect the latest state, right?
   
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   
   Yes, the only thing is whether appendEntries can queue up the log record in 
the right order. I am not sure simply acquiring the group lock when 
asynchronously calling appendEntries achieves this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381953916


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data.
   
   Is that true? It seems that you want the latest log to reflect the latest 
state, right?
   
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   
   Yes, the only thing is whether appendEntries can queue up the log record in 
the right order. I am not sure simply acquiring the group lock when 
asynchronously calling appendEntries achieves this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1792695126

   It seems the offset validation is a tricky question requiring more thought, 
so I will file a jira for it. Given that the offset verification is not in 3.6, 
I think it makes sense to merge the request local fix separately. Are we all in 
agreement for that part of the change?
   
   cc: @dajac @junrao @hachikuji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


dajac commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381380645


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I agree with @junrao here. 
   
   > The expectation is that queuing up of the log record can be done 
synchronously.
   
   However, it does not mean that it should be written to the log. The minimum 
is to guarantee the ordering of the records based on the state changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-03 Thread via GitHub


dajac commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381378455


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   @hachikuji Yeah, that's right. I think that we could do it as follow:
   1) Apply first verification (e.g. is it the right coordinator, is the member 
valid, etc)
   2) Prepare the transaction (the async call to the transaction coordinator if 
needed)
   3) Re-verify
   4) State change + write
   
   This would reduce the likelihood of preparing the transaction for nothing 
but it could still happen because the state could change in between. I think 
that this is possible regardless of the model that we use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work. 
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to an offset partition when a rebalance happens after offsets 
are are written as part of the transaction but before the transaction is 
committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380899701


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   > 
   > I think Artem was not necessarily disagreeing with that idea but rather 
that a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work.
   > 
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data. It was unclear to me from 
Jason's comments if this validation is fully necessary due to how we can 
sometimes write stale data to a transaction when a rebalance happens after 
records are written but before the transaction is committed.
   > 
   > Basically there are two questions I see:
   > 
   > 1. Can we make this code more async friendly to avoid locking? (And does 
the new group coordinator do this?)
   > 2. Do we really need the lock if we can write inconsistent data outside 
the typical locking mechanisms and we handle it correctly in those cases?
   
   



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   > 
   > I think Artem was not necessarily disagreeing with that idea but rather 
that a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work.
   > 
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data. It was unclear to me from 
Jason's comments if this validation is fully necessary due to how we can 
sometimes write stale data to a transaction when a rebalance happens after 
records are written but before the transaction is committed.
   > 
   > Basically there are two questions I see:
   > 
   > 1. Can we make this code more async friendly to avoid locking? (And does 
the new group coordinator do this?)
   > 2. Do we really need the lock if we can write inconsistent data outside 
the typical locking mechanisms and we handle it correctly in those cases?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work. 
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to a transaction when a rebalance happens after records are 
written but before the transaction is committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all.
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to a transaction when a rebalance happens after records are 
written but before the transaction is committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380888754


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes, the current flow in group coordinator is updating the state, queuing up 
a corresponding log record for commit, and calling the callback when the log 
record is committed. It needs the log records for a group to be queued up and 
committed in the same order as the updating of the state so that (1) the latest 
state can be replayed from the log records correctly in case of failure and (2) 
the callback can be called in the updating order. Currently, this is achieved 
through holding the group lock during updating the state and queuing up of the 
log record (implemented through `ReplicaManager.append`). The expectation is 
that queuing up of the log record can be done synchronously. Adding async txn 
validation dependency inside `ReplicaManager.append` breaks that expectation.
   
   To address this issue, we could move the async txn validation dependency out 
of `ReplicaManager.append`. Alternatively, we could implement another way of 
synchronously queuing up a log record since `ReplicaManager.append` does more 
than just queuing up a record.
   
   I am not sure holding the group lock just for updating the state is enough 
for ensuring the ordering of the log records. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That makes sense to me. So maybe the verification would go through a 
different flow where the callback would include both the state update and the 
write via appendEntries. The main question I have is whether we should consider 
this as a separate JIRA and do in a follow-up PR. I am inclined to go with this 
option. 
   ~(But do it ASAP for 3.6.1)~ We don't need this for 3.6.1 since the offsets 
are not in 3.6.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380813788


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Hmm, I think it would break the abstraction of the appendRecords being a 
fundamentally asynchronous call, i.e. it has the syntax of being asynchronous 
(i.e. completion callback is surely called in a different thread) but still a 
concrete implementation expects that a specific part of the call is 
synchronous, but the code has no indication which one (maybe some day we'd use 
async IO and again make it more asynchronous then the code around it would 
expect).  Like why would the group coordinator need to know about transaction 
verification flow which has it's own trickiness with verification guard and may 
actually (likely) be different for KIP-890 part2?
   
   I think it would be easier for separation of concerns if the lock is held 
only around im-memory  state updates and just transition the state into a 
"pending state", so that the new updated would be queued or failed (with a 
retriable error) until async call is done.
   
   Then we wouldn't have a situation where we have a lock with a syntax that 
looks like "lock around in-memory update + call", but with the desired 
semantics "[lock] [in-memory update] [some part of call] [unlock] [some other 
part of call]" and the place of [unlock] is fairly deep in logic that has no 
indication that by doing an unrelated action (making the code as asynchronous 
as need to avoid holding threads) it could actually break the desired semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That makes sense to me. So maybe the verification would go through a 
different flow where the callback would include both the state update and the 
write via appendEntries. The main question I have is whether we should consider 
this as a separate JIRA and do in a follow-up PR. I am inclined to go with this 
option. 
   ~(But do it ASAP for 3.6.1)~ We don't need this since the offsets are not in 
3.6.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   This is effectively what we are doing right? appendEntries is just replica 
manager append w/o further txn validation. But maybe I misunderstood the 
suggestion since we still need the callback logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   This is effectively what we are doing right? appendEntries is just replica 
manager append w/o further txn validation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380724965


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I guess that proposal is that when serving an `OffsetCommit` request, 
KafkaAPIs first determines the partitions for txn validation. It then initiates 
the txn validation asychnronously. Once the validation is done, it calls 
`ReplicaManager.append` w/o further txn validation.
   
   If we do this, I am wondering if we should just do the same for regular 
produce requests too. This way, we don't need the callback logic for txn 
validation in `ReplicaManager.append`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380634456


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   @dajac Yes, that is similar to what I had in mind. We need to reacquire the 
lock after validation in order to verify whether the write should still be 
accepted. And we still need to protect the write itself. Your approach seems 
like a simpler implementation of this. The only downside I can think of is that 
we may add the the partition to the transaction but still ultimately discard 
the write, but this is probably not a common case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That makes sense to me. So maybe the verification would go through a 
different flow where the callback would include both the state update and the 
write via appendEntries. The main question I have is whether we should consider 
this as a separate JIRA and do in a follow-up PR. I am inclined to go with this 
option. 
   (But do it ASAP for 3.6.1).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


dajac commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379784888


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Sorry, I am late here. I actually faced a similar issue while implementing 
this API in the new group coordinator. The scenario is more or less the same. 
We need to guarantee that the update to the state machine and to the log are 
done together to guarantee the ordering of the changes.
   
   Given this, I wonder if we should just change how we do it. I was about to 
re-implement this part to work as follow:
   1) Verify the transaction and possibly go the transaction coordinator;
   2) Execute the actual transactional offset commit logic (update state + 
write).
   
   I think that this is easier than introducing extra bookkeeping to track 
dependencies. What do you think? Note that I need to change it anyway for the 
new coordinator.
   
   One way would be to bring this logic to the KafkaApis layer or we could 
rework the write logic within the group coordinator. Would it make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379416743


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Given this is separate to the requestLocal issue, does it make sense to file 
a JIRA and fix there? We can still consider it for a patch release if we think 
this is a regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379404877


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   @junrao If the lock cannot protect the write, then the validation in the 
group coordinator cannot be relied on. In principle, it would be possible for 
an older offset commit to override a newer one for example. A more likely 
scenario might be for a group to rebalance while an offset commit is awaiting 
validation. This would allow a fenced consumer to be able to commit offsets. 
Without the lock, I think we'd have to introduce additional bookkeeping to 
track dependencies between inflight writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379066906


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I think it's more a concern about group changes that are made while the 
verification is ongoing. Offline we were discussing what would happen if we 
bumped an epoch/rebalance for example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-01 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1379034752


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   @hachikuji : I am wondering the reason for the group coordinator to hold a 
group lock while appending to the log. It seems it's just to make sure the 
group state checking/updating and the appending to the log can be done 
atomically. I am not sure if there is any ordering guarantee since requests 
from different clients could come in arbitrary order. Is that the case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-31 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378272592


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],
+errorsPerPartition: Map[TopicPartition, Errors],
+sTime: Long,
+recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit,
+timeout: Long,
+responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+delayedProduceLock: Option[Lock])
+   (requestLocal: RequestLocal)

Review Comment:
   Jason helped me with Scala offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-31 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378267516


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],
+errorsPerPartition: Map[TopicPartition, Errors],
+sTime: Long,
+recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit,
+timeout: Long,
+responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+delayedProduceLock: Option[Lock])
+   (requestLocal: RequestLocal)

Review Comment:
   Hmm I'm not sure I understand. we need these arguments in the method right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-31 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378225151


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   Jun wanted the explicit class so folks would see the comment about the 
request local etc.
   We could do a case class otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-31 Thread via GitHub


hachikuji commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1378179666


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   nit: **is** expected?



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   nit: could probably be case class?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +781,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],
+errorsPerPartition: Map[TopicPartition, Errors],
+sTime: Long,
+recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit,
+timeout: Long,
+responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+delayedProduceLock: Option[Lock])
+   (requestLocal: RequestLocal)

Review Comment:
   nit: probably simpler to return `(request: RequestLocal, unverifiedEntries: 
Map[TopicPartition, Errors])`. Then we don't need the awkward `(_)(_)` in the 
call above



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
+
+  /**
+   * Wrap callback to schedule it on an abitrary request thread.

Review Comment:
   nit: typo arbitrary



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Hmm, without the lock, an offset commit may be reordered after a group state 
change. To make it safe, we would need to reacquire the group lock after 
validation completes so that we could check again whether the request should be 
accepted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866891


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-15758



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866596


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   I understand the issue. I have a commit below that clears the request local.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376821250


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   There is a verification state for the doCommitTxnOffset path which is mostly 
the same path (execept for using storeOffsets instead of storeGroup)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376817191


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   It seems that storing the group coordinator state is not transactional, so 
verification code path shouldn't be used.  But I agree with question of the 
locking model in the group coordinator and whether it'd work correctly with 
more asynchronous approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376679750


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I am saying that before the append was under the lock. With the verification 
change, now it is not. I think we are saying the same thing here. We need to 
figure out if the append not under the lock is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376678541


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   I we reduce then I assume the thread just stops?



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   If we reduce then I assume the thread just stops?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376674826


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > It would just be the append itself that is not under the lock.
   
   Hmm, is that true? We have the following code.
   
   ```
   GroupCoordinator.doSyncGroup
   group.inLock {
   ...
 groupManager.storeGroup
   ... 
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376671529


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Not sure how strongly we need the `GroupMetadata` lock while appending to 
the log. But I think the intention is to make sure that all writes to the log 
for a group are serialized. Updating the in-memory state typically happens in 
the callback when the HWM advances and is also protected by the `GroupMetadata` 
lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376670972


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   It seems that we do have the lock for the in-memory state. It would just be 
the append itself that is not under the lock.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376662251


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   >  I am wondering about the locking model. We have the following path that's 
called under the GroupMetadata lock. GroupCoordinator.doSyncGroup (hold 
GroupMetadata lock) => groupManager.storeGroup => appendForGroup => 
replicaManager.appendRecords => replicaManager.appendEntries. However, if we 
register the callback, replicaManager.appendEntries will be called without 
holding the GroupMetadata lock. Is that safe?
   
   Hmm. Do we need the lock to write to log or just to updaet the in-memory 
state. I can take another look at this path. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376661650


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   True in the common case. I was wondering what happens if we dynamically 
reduce the number of request handler threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376659689


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Hmm. We assign the request channel when the thread is created. I assume that 
request channel will remain with the thread during its lifetime. Is that 
incorrect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376657322


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Thanks, Justine and Artem. Yes, passing around the context has it's own 
issues. The main thing with thread local is to make sure that we don't 
introduce any GC issues. Currently, it seems that we never remove 
`threadRequestChannel`. Since we allow dynamically changing the number of 
request handler threads, it's probably better to remove `threadRequestChannel` 
when the thread completes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376654240


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   >  actionable practical recommendation
   
   I can't think of an idea other than passing deep copy of arguments. I have 
also considered suggesting wrapping in Collections.unmodifiable or 
Collections.synchronizedCollection but none of these ideas are good enough. 
   
   We need to re-think callback invocation pattern in Kafka and see if we can 
avoid this pattern of passing reference of in-memory state around threads. But 
I am happy to keep it our of this PR and discuss that in scope of JIRA that 
Ismael created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376651533


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   1. On the server side, some of the callbacks (Purgatory, the one introduced 
in KAFKA-14561, etc) are called in a different thread than the caller. I am not 
sure if it's possible/desirable to change all of those callbacks to be run on 
the same thread. Given that model, I agree that the wide exposure of 
ThreadLocal seems potentially dangerous. I pinged 
https://github.com/apache/kafka/pull/9220 to see if we could consider an 
alternative approach. It might be easier to fix the `ThreadLocal` thing first 
before fixing this issue.
   2. In general, callbacks on different threads are tricky to get right. For 
example, we spend a lot of time to fix the deadlock issues related to Purgatory 
(https://issues.apache.org/jira/browse/KAFKA-8334). For this newly introduced 
callback, I am wondering about the locking model. We have the following path 
that's called under the GroupMetadata lock. `GroupCoordinator.doSyncGroup (hold 
GroupMetadata lock) => groupManager.storeGroup => appendForGroup => 
replicaManager.appendRecords => replicaManager.appendEntries`. However, if we 
register the callback, `replicaManager.appendEntries` will be called without 
holding the `GroupMetadata` lock. Is that safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376507019


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   (Race condition -- my reply was to Ismael w.r.t. "more care required").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376502780


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I agree, it's just not exactly clear what an actionable practical 
recommendation could be derived from that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376481605


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Thanks for the discussion here folks. My point about the immutability of the 
map is that we can't put new objects in the map and the values appeared to be 
final. I agree that the method itself (and the argument for wrap) doesn't 
guarantee immutability and/or thread safety. 
   
   I think we can agree the the RequestLocal object provides specific 
challenges for callbacks that require it. Specifically callbacks that may 
execute on a different thread may see issues. 
   
   How do we want to move forward from here? I can modify my comment and the 
class to remove the comments about thread safety etc and focus on the fact that 
the method may execute on a different thread. (ie name the class something that 
reflects that) Were there any other changes we think are necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375357830


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I think you misunderstood my point. I was saying that there are two types of 
callbacks: (1) execute on the same thread (2) execute on a different thread. 
They're not the same and the latter require more care.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375355538


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I think how much work is done in the callback is more a quantitative 
difference rather than a qualitative difference -- how much work needs to be 
done to treat it as a new pattern that requires some special care (and what is 
that care, can we qualify)?I agree that if the functionality in the 
callback just happened to be after the point of use of the RequestLocal then we 
wouldn't hit the problem (even if we captured it in the callback), but it seems 
to a matter of just random "luck".  I don't think "don't be unlucky" is a 
useful engineering guidance.  On the other hand, saying "don't pass thread 
local as an argument, access it at the point of use as a thread local instead" 
would avoid these cases.
   
   But maybe the first point to discuss is do we agree that the root cause is 
the fact that RequestLocal is bound to the executing thread context 
(effectively thread local passed as an argument) and it's not a typical pattern 
in Java or do we think that it's something else?  At some point I think there 
was a theory that the machinery for re-scheduling the callback on the request 
thread pool could be a culprit, but I think we all agree, that the problem 
would be the same even if the callback was executed directly on the network 
thread (or pretty much in any other way other than trying to schedule it on the 
exact same thread, which is doable, btw, but would hinder performance).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375332013


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   A lot of callback code runs on the same thread though - the cases where a 
different thread is used require more care and have caused a bunch of pain in 
the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375329122


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That's correct -- we don't create new concurrent access, i.e. this callback 
is not called on multiple threads at the same time, all we do here is 
potentially executing this callback on a different thread.  Actually I think 
"Accessing non-thread-safe data structures should be avoided if possible" 
comment is misleading here, the only thing that we have to avoid is capturing 
thread local (or effectively thread local) objects in the callback.  Other 
objects can be as thread safe as they would be if this method was called 
directly on this thread (i.e. if an object was rooted on the call stack it 
doesn't have to be thread safe because it's not going to be used concurrently 
from multiple threads).
   
   BTW, we have plenty of cases in Kafka (both client and server) where we pass 
a callback to continue execution of a single-threaded logical task after an RPC 
call is complete (most likely on a different thread), so I don't think we're 
dealing with unique pattern here.  The only thing that is unique (and tricky) 
here is the RequestLocal that disguises as a context of a logical task 
(arguments are generally are logical task context), but it is actually a 
concrete physical thread context that would be different (or missing 
completely, say if we executed this callback on the network thread) if we 
continued execution of this logical task on a different thread.
   
   I've already pointed out elsewhere that (in a future change) we should 
probably remove the RequestLocal from the argument list and make it a 
thread-local (which it effectively is) and just use it directly where it's 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375263740


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Actually, I don't think this method enforces that these maps are immutable. 
That said, my understanding is that we basically suspend processing and hence 
there is only one thread accessing these structures at a time. Is that correct? 
If so, the main thing we need to do is ensure we safely publish these data 
structures, they don't have to be immutable necessarily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-28 Thread via GitHub


ijuma commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375262871


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Can we file a JIRA for investigating the alternative? I agree with @junrao 
that it would be much simpler to reason about if we always schedule it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375078778


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Yeah, the hope is that we'd see more async patterns as we evolve Kafka, and 
the way it's currently implemented, we just say "execute callback on the 
_current_ thread pool", which is why the thread local is used -- this way we 
can implement a non-blocking wait-and-continue-on-current-thread-pool from any 
level without passing additional arguments through the whole stack.  The same 
functionality could be implemented on other thread pools, e.g. if the group 
coordinator has its own thread pool, we could implement the same primitive 
there as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375061797


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   As for just produce. Yes, this is the case now, but I think Artem was trying 
to create the wrap method as a "general" callback mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375060752


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   We pass the wrapped method which is a simpler type.  `type AppendCallback = 
Map[TopicPartition, Errors] => Unit`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375057023


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   > Hmmm. I'm not sure it makes sense to try to pass the request channel and 
current in to every method we want to do a callback for.
   
   Currently, the callback is only needed for produce request in 
ReplicaManager. If you look at `KafkaApis.handleProduceRequest`, we already 
pass in both request channel and current request to 
`ReplicaManager.appendRecords` through `sendResponseCallback`.
   
   > As for 2 and short circuiting... Are you suggesting I move the wrap call 
into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method 
in there as well. That may be trickier with the typing.
   
   Yes. Currently, we already pass in `appendEntries` as a callback to 
`AddPartitionsToTxnManager`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1783521458

   Following up with build failure here: 
https://github.com/apache/kafka/pull/14545#issuecomment-1783515553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375044842


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Hmmm. I'm not sure it makes sense to try to pass the request channel and 
current in to every method we want to do a callback for.
   
   As for 2 and short circuiting... Are you suggesting I move the wrap call 
into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method 
in there as well. That may be trickier with the typing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375025398


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Ismael mentioned in 
https://github.com/apache/kafka/pull/9229#issuecomment-683352094 that thread 
locals are most useful when one doesn't control the code. So, I am wondering if 
we could get rid of the two ThreadLocal: `threadRequestChannel` and 
`threadCurrentRequest` in `KafkaRequestHandler` introduced in KAFKA-14561.  The 
reason for the former is to obtain requestChannel. We could simply pass in 
requestChannel to `ReplicaManager.appendRecords`. The reason for the latter is 
(1) to obtain currentRequest and (2) to make sure that the callback can be 
short-circuited if it's called on the same request handler thread. For (1), we 
could also pass in currentRequest to `ReplicaManager.appendRecords`. For (2), 
we could change the code such that `KafkaRequestHandler.wrap` is called only 
when the callback truly needs to be run from a different thread. Otherwise, we 
can just call the callback directly there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375016328


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   I think ideally in a future change we'd get rid of passing RequestLocal as 
an argument and maybe make it a static thread local that could be accessed from 
the point of use rather than being passed through the whole stack.
   
   There are couple problems that contributed to this issue:
   - the functions here are already written for asynchronous completion 
(because we wait for replication) and in such cases generally the convention is 
that the arguments of a function are not bound to the executing thread (i.e. 
rooted on the call stack or globally)
   - the point of use was outside of the core change so folks didn't look into 
the specifics of the RequestLocal semantics (i.e. even if appendEntries was an 
explicit function as it now is, I'm not sure if the problem had been noticed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1374844889


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   That seems reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1374839084


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   Since this is a generic util, it would be useful for future users of this 
method to understand what kind of `fun` could be passed in. I was thinking that 
perhaps we could make `fun` of a new interface (sth like `ThreadSafeCallback`). 
We can document the expectation of `ThreadSafeCallback` (e.g., only executed 
once, but could be in an arbitrary request handler thread; if an implementation 
needs to access a share data structure, the access needs to be thread safe, 
etc). Then, anyone who uses this method in the future needs to be aware of this 
interface. This may address a bit of Divij's concern. Is that worth doing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373793336


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes. I'm having trouble speaking today. Map in scala is immutable. 臘‍♀️ 
Apologies for confusion.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes. I'm having trouble speaking today. Map in scala is immutable. 臘‍♀️ 
Apologies for confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373548000


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > Sorry for my phrasing. I was saying they are mutable. 
   
   Did you mean to say immutable here? If they are mutable, then we need to 
ensure that they are not concurrently mutated in different threads.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > Sorry for my phrasing. I was saying they are mutable. 
   
   Did you mean to say immutable here? If they are mutable, then we need to 
ensure that they are not concurrently mutated in different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373532105


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Sorry for my phrasing. I was saying they are mutable.  



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Sorry for my phrasing. I was saying they are mutable.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373526592


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > These maps are not mutable right?
   
   Don't know. I haven't really checked. I wanted to ask you instead if you 
have verified that :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373475149


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   These maps are not mutable right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372844362


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   1. Are we sure that the Maps being passed here are not concurrently modified 
by two threads? Asking because they are not thread safe (they aren't concurrent 
hash maps). I haven't looked in details about the params passed here so please 
feel free to say that you have already verified this.
   
   2. How can we prevent future bugs where someone mutates one of these 
parameters in perhaps other parts of the code such as Kafka APIs where 
appendEntries is called from without knowing that all these data structures are 
supposed to be thread safe?
   
   One option to prevent future bugs could be to pass a deep copy of these 
objects to the callback (instead of a copy of the reference). Another thing we 
can do is to restrict the number of parameters required by the callback.
   
   Having said that, have we already considered not having this as a callback? 
I understand that it has drawbacks since it is in critical produce path but 
maybe that would be acceptable given the complexity of ensuring thread safety 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372844362


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   1. Are we sure that the Maps being passed here are not concurrently modified 
by two threads? Asking because they are not thread safe (they aren't concurrent 
hash maps). I haven't looked in details about the params passed here so please 
feel free to say that you have already verified this.
   
   2. How can we prevent future bugs where someone mutates one of these 
parameters in perhaps other parts of the code such as Kafka APIs where 
appendEntries is called from without knowing that all these data structures are 
supposed to be thread safe?
   
   One option to prevent future bugs could be to pass a deep copy of these 
objects to the callback (instead of a copy of the reference). Another thing we 
can do is to restrict the number of parameters required by the callback.
   
   Having said that, have we already considered not having this as a callback 
and instead appending this synchronously? I understand that it has drawbacks 
since it is in critical produce path but maybe that would be acceptable given 
the complexity of ensuring thread safety here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414818


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   Could we update the javadoc for the new param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414239


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Thanks for the explanation, Justine and Artem. This sounds good then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372379141


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   My understanding is that concurrency is not a problem in production logic 
(i.e. the fact that callback might actually execute at any time before or after 
or concurrently with the passing thread), but unit tests rely on deterministic 
order of execution and so adding concurrency here makes them "flaky".
   I agree in principle that if we could fold this logic into just one case 
(i.e. always schedule on a new request thread), it would make it simpler; when 
this change was proposed I looked into unit test fix and seemed more involved 
than adding this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372377851


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Sorry maybe I'm not explaining myself well.
   
   We saw problems before when the callback was scheduled before the request 
returned. We fixed it by adding this check.
   
   If we want to remove this check, we will need to do further investigation 
for that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372376795


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure the that correct RequestLocal

Review Comment:
   Ensure the that => Ensure that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372366141


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   So, is this still a problem that we need to address in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372329928


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   That seemed to cause problems when David moved changed the code to do the 
callback before we finished processing the request. 
   
   We never figured out the root cause though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372322049


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Thanks, Justine. It seems that we can't easily get rid of the ThreadLocal 
stuff. So, we can leave this as it is.
   
   > I am concerned that executing the callback and before we return from the 
request thread causes issues.
   
   With the current PR, it seems that it's possible for the callback to be 
called before the request handler thread finishes processing the request (that 
generates the callback), right? Is that causing any problem?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372308747


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   Thanks, Justine and Artem. Agreed that the alternative approach has its own 
drawback. So, we could punt on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372270435


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   I think the way it's implemented right know is that we're just continuing 
processing from the current point after the network operation has completed, so 
as far as the "application" logic is concerned, the machinery to resume the 
execution on the request pool is hidden from the "application" logic.  This 
allows to do an arbitrary number of non-blocking waits with minimal disruption 
of the application logic flow (the change is localized in the point of 
non-blocking wait).
   In this case we could inject a new produce request (if I understand your 
question correctly) with some context saying that it's "verified", but I think 
it'd be more intrusive as we'd need to implement some produce-specific 
"verified" context (that would have to include verification guard and whatnot) 
and pass it down through all functions. We'd also need to consider the offset 
commit path, which is a separate path that eventually comes to this function.  
Also, if we consider a more general functionality (we don't have any, but 
hopefully, if we make Kafka more async there will be more cases), generally 
we'd want to preserve the progress that has been made until the network call, 
so if a request would need to work work XYZ and we have non-blocking network 
calls between stages, it would be good that the overall work would be X [RPC] Y 
[RPC] Z, rather than X [RPC] XY [RPC] XYZ.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Right -- we had that before and it caused issues @dajac has more context. 
   I am concerned that executing the callback and before we return from the 
request thread causes issues.
   
   In addition, for tests, we would need to set up the request channel etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Right -- we had that before and it caused issues @dajac has more context. 
   In addition, for tests, we would need to set up the request channel etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >