Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
github-actions[bot] commented on PR #14705: URL: https://github.com/apache/kafka/pull/14705#issuecomment-1945310660 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
junrao commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1396147285 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: Thanks, Artem. Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1395062167 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: The locking model is not changed -- it holds the lock around the whole call, see line 1241 `result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock());` the .whenComplete callback will execute after the function is complete, so lock is held around the whole thing. The unlock in the `finally` clause is so that if we asyncFunc.apply throws an exception (which would happen if the function in fact is executed synchronously) and we didn't get the future, then we unlock inline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
junrao commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1395031179 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: Another potential issue is the ordering. `withActiveContextOrThrow` holds a partition level lock to make sure the record is replayed in the state machine in the same order as it's appended to the log. With `withActiveContextOrThrowAsync`, we hold the lock to replay the record, but appends to the log without the lock. The could create a situation that the state machine may not be exactly recreated by replaying records from the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1388948636 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: To be honest, if we implement proper concurrency granularity for groups (serialize group updates [not whole partition], keep read "lock" on groups during commit updates) I'm not sure if we'd get much extra perf gain from piercing the appendRecords abstraction to implement pipelining. Then we could get rid of the timeline snapshot structure and hooking into replication pipeline to listen for HWM updates; we could just do appendRecords and wait for completion. Then we could completely decouple group coordinator logic from the storage stack and make it simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1388644791 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: > This is completely unrelated in my opinion as this is true for both the old and the new coordinator. It's true that it's a problem with the old coordinator, and we should make the whatever minimal fixes required for the old coordinator to work (and if it happens to work end-to-end, which I think it might, we won't need to fix it), but that code is going away and shouldn't define the forward-looking architecture. As we build the new coordinator, we should build it in a way that improves forward-looking architecture. Keeping the right abstraction is good, coincidentally it helps with the timelines -- we can use this proposal and use the work that already has been done instead of doing new work of bringing implementation details into group coordinator. Moreover, I wonder if we need yet another thread pool to handle group coordinator logic, I think it would be good to just re-use the request handler threads to run this functionality. This would avoid thread pools proliferation and also reuse various useful improvements that work only on request pool threads, e.g. RequestLocal (hopefully we'll make it into a real thread local to be used at the point of use instead of passing the argument), various observability things, etc. Here is a PoC that does that using NonBlockingSynchronizer and KafkaRequestHandler.wrap https://github.com/apache/kafka/pull/14728/commits/46acf0220434926305b343299d2780a34bf8a7de The NonBlockingSynchronizer replaces EventAccumulator and MultiThreadedEventProcessor (I didn't remove them to keep the change small), it has some perf benefits e.g. in uncontended cases, the processing continues running on the request thread instead of being rescheduled on the gc thread pool. I can also easily implement read-write synchronization for the NonBlockingSynchronizer (so that readers won't block each other out), e.g. to implement non-blocking read "lock" on group when committing offsets. It's not to say I don't like the current code, but it feels like we re-building functionality that we already have elsewhere in Kafka and we we could re-use the existing building blocks so that the gc focuses on group coordination rather than managing thread pools, getting into the details of transactional protocol, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
dajac commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1388535147 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: Thanks for looking into this. Here is my take: > That is correct, it may become a perf problem I strongly disagree on blocking the event loop. It will not become a perf problem. It is one. It is also an anti-pattern. > Right now it is a functional problem It is technically not a functional problem, at least not yet, because I haven't not implemented the transactional offset commit in the new coordinator. ;) > appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes) I will change this to not use appendRecords, this will make the contract clear. > now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area This is incorrect. We knew about this and we always had an implementation in mind which works. I will basically decouple the write in two stages: 1) validate/prepare the transaction; and 2) update state and write. As we discussed in the other PR, this is also required for the old coordinator to work correctly. > KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848 I don't agree with this. As we just saw, we already failed to make it work correctly for the existing coordinator so the dependency was already there. Again, we can do better, I agree. > the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs This is completely unrelated in my opinion as this is true for both the old and the new coordinator. Overall, I agree that we could do better but I think that it is not the right time to change this. We are already under high time pressure and actually changing this in the right way puts even more pressure. We should look for a proper solution afterwards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1388499287 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: > if one client's log append is blocked for additional async check That is correct, it may become a perf problem, we can measure and see if it's worth fixing in practice, we'll have this choice (as well as the choice to postpone the fix, if we have time pressure to release). But it won't be a functional problem. Right now it is a functional problem, which is suboptimal in many ways: - appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes) - we cannot release new transaction protocol (or new coordinator) without implementing new logic, which makes hard dependencies and pushes against timelines (now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area) - KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848 - 2 fairly complex components are now tied together -- we cannot just innovate on transaction protocol implementation details (or to be broader -- on the whole IO subsystem implementation details -- e.g. Async IO) without understanding group coordinator implementation detail and we cannot innovate on group coordinator implementation detail without understanding implementation details of transaction protocol - to make the previous point worse, the dependency is not visible at the "point of use" -- someone tasked with improving transaction protocol (or IO in general) would have no indication from the appendRecords interface, that adding an async stage would need to have a corresponding change in group coordinator - the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs IMO, the fact that transaction verification implementation just doesn't work out-of-box with the new group coordinator (and in fact requires quite non-trivial follow-up work that will block the release) is an architectural issue. We should strive to make the system more decoupled, so that the context an engineer needs to understand to make local changes in a part of system is less. > Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group. I don't think it's bound to a thread, but indeed the concurrency is limited to partition -- we don't let operations on the same partition run concurrently, so all the groups that are mapped to the same partition are contending. This is, however, a specific implementation choice, it should be possible to make a group to be a unit of concurrency, and if that's not enough, we can let offset commits for different partitions go concurrently as well (they just need to make sure that group doesn't change, which is sort of a "read lock" on the group), at which point there probably wouldn't be any contention in the common path. Now, one might ask a question, implementing per-group synchronization adds complexity and handling transaction verification as an explicit state transition in group coordinator adds complexity, what the difference? I'd say the difference is fundamental -- per-group synchronization complexity is encapsulated in one component and keeps the system decoupled: an engineer tasked to improve transaction protocol, doesn't need understand implementation details of group coordinator and vice versa. Changes are smaller, can be made faster, and less bug prone. Win-win-win. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
junrao commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1387227928 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: Thanks for the explanation, Artem. Yes, it's true that the new group coordinator only depends on acks=1. Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group. In the proposed approach, if one client's log append is blocked for additional async check, it blocks the processing of other clients and other groups. So, it still seems to reduce the overall throughput somewhat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1385493045 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: We actually don't need to wait for replication, so the current pipelining works without changes -- the current logic uses acks=1 and captures the offset and then waits for HWM to be advanced to complete the write request. It may prevent potential pipelining opportunities if new async stages are added for acks=1 (e.g. transaction verification). But the most important thing is that with this proposal, innovating under appendRecords interface would just work out of box, which is the purpose of having interfaces -- innovating under the interface doesn't break callers that use interface correctly (which makes system modular). If we find out that we want the pipelining for transaction verification we can make this optimization later (if we find it to be a problem). We will have a choice between complexity and potentially better pipelining; with the current model, we don't have the choice -- the workflow will break if we add an async state to acks=1 processing and will have to fix it before shipping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
junrao commented on code in PR #14705: URL: https://github.com/apache/kafka/pull/14705#discussion_r1385389618 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { +try { +runAsync().get(); Review Comment: Hmm, while this guarantees ordering, it disables pipelining and thus potentially reduces the throughput, since we have to wait for each event's records to be fully replicated before processing the next event. We probably could introduce a different callback in `ReplicaManager.appendRecords` that's invoked when the records are appended to the local log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] PROPOSAL: support async event execution in group coordinator [kafka]
artemlivshits opened a new pull request, #14705: URL: https://github.com/apache/kafka/pull/14705 This change fixes a broken abstraction where event execution relies on specific implementation detail of the ReplicaManager.appendRecords that with some arguments it is completed synchronously even though the interface is clearly asynchronous. This assumption can be broken by changing implementation, as shown by KIP-890 work that added transaction verification stage that may result in asynchronous completion (which should be perfectly fine because the function interface is asynchronous and must be used as such) and violate the assumption of event execution. Now the event execution supports asynchronous completion and can properly handle asynchronous completion of the underlying functionality. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org