Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2024-02-14 Thread via GitHub


github-actions[bot] commented on PR #14705:
URL: https://github.com/apache/kafka/pull/14705#issuecomment-1945310660

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-16 Thread via GitHub


junrao commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1396147285


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Thanks, Artem. Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-15 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1395062167


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   The locking model is not changed -- it holds the lock around the whole call, 
see line 1241
   
`result = asyncFunc.apply(context).whenComplete((none, t) -> 
context.lock.unlock());`
   
   the .whenComplete callback will execute after the function is complete, so 
lock is held around the whole thing.
   
   The unlock in the `finally` clause is so that if we asyncFunc.apply throws 
an exception (which would happen if the function in fact is executed 
synchronously) and we didn't get the future, then we unlock inline.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-15 Thread via GitHub


junrao commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1395031179


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Another potential issue is the ordering. `withActiveContextOrThrow` holds a 
partition level lock to make sure the record is replayed in the state machine 
in the same order as it's appended to the log. With 
`withActiveContextOrThrowAsync`, we hold the lock to replay the record, but 
appends to the log without the lock. The could create a situation that the 
state machine may not be exactly recreated by replaying records from the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388948636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   To be honest, if we implement proper concurrency granularity for groups 
(serialize group updates [not whole partition], keep read "lock" on groups 
during commit updates) I'm not sure if we'd get much extra perf gain from 
piercing the appendRecords abstraction to implement pipelining.  Then we could 
get rid of the timeline snapshot structure and hooking into replication 
pipeline to listen for HWM updates; we could just do appendRecords and wait for 
completion.  Then we could completely decouple group coordinator logic from the 
storage stack and make it simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388644791


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   > This is completely unrelated in my opinion as this is true for both the 
old and the new coordinator.
   
   It's true that it's a problem with the old coordinator, and we should make 
the whatever minimal fixes required for the old coordinator to work (and if it 
happens to work end-to-end, which I think it might, we won't need to fix it), 
but that code is going away and shouldn't define the forward-looking 
architecture.
   
   As we build the new coordinator, we should build it in a way that improves 
forward-looking architecture.  Keeping the right abstraction is good, 
coincidentally it helps with the timelines -- we can use this proposal and use 
the work that already has been done instead of doing new work of bringing 
implementation details into group coordinator.
   
   Moreover, I wonder if we need yet another thread pool to handle group 
coordinator logic, I think it would be good to just re-use the request handler 
threads to run this functionality.  This would avoid thread pools proliferation 
and also reuse various useful improvements that work only on request pool 
threads, e.g. RequestLocal (hopefully we'll make it into a real thread local to 
be used at the point of use instead of passing the argument), various 
observability things, etc.  Here is a PoC that does that using 
NonBlockingSynchronizer and KafkaRequestHandler.wrap 
   
   
https://github.com/apache/kafka/pull/14728/commits/46acf0220434926305b343299d2780a34bf8a7de
   
   The NonBlockingSynchronizer replaces EventAccumulator and 
MultiThreadedEventProcessor (I didn't remove them to keep the change small), it 
has some perf benefits e.g. in uncontended cases, the processing continues 
running on the request thread instead of being rescheduled on the gc thread 
pool.  I can also easily implement read-write synchronization for the 
NonBlockingSynchronizer (so that readers won't block each other out), e.g. to 
implement non-blocking read "lock" on group when committing offsets.
   
   It's not to say I don't like the current code, but it feels like we 
re-building functionality that we already have elsewhere in Kafka and we we 
could re-use the existing building blocks so that the gc focuses on group 
coordination rather than managing thread pools, getting into the details of 
transactional protocol, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


dajac commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388535147


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Thanks for looking into this. Here is my take:
   
   > That is correct, it may become a perf problem
   
   I strongly disagree on blocking the event loop. It will not become a perf 
problem. It is one. It is also an anti-pattern.
   
   > Right now it is a functional problem
   
   It is technically not a functional problem, at least not yet, because I 
haven't not implemented the transactional offset commit in the new coordinator. 
;)
   
   > appendRecords has async interface, thus adding async stages under such an 
interface can be done without inspection and understanding all callers (that's 
what an interface is -- any compliant implementation is valid), but doing so 
will break the current logic (so from the proper interface usage perspective it 
is a bug in the caller, which this proposal fixes)
   
   I will change this to not use appendRecords, this will make the contract 
clear.
   
   > now all of a sudden KIP-848 got a new work to do before release, just 
because there is some independent work is going on in transaction area
   
   This is incorrect. We knew about this and we always had an implementation in 
mind which works. I will basically decouple the write in two stages: 1) 
validate/prepare the transaction; and 2) update state and write. As we 
discussed in the other PR, this is also required for the old coordinator to 
work correctly.
   
   > KIP-890 part2 design is still under discussion, the verification protocol 
is likely to change, so any changes in KIP-890 protocol are going to have 
ripple effects on KIP-848
   
   I don't agree with this. As we just saw, we already failed to make it work 
correctly for the existing coordinator so the dependency was already there. 
Again, we can do better, I agree.
   
   > the work needs to be duplicated in group coordinator (and the protocol is 
going slightly different for different client versions) which becomes a likely 
source of bugs
   
   This is completely unrelated in my opinion as this is true for both the old 
and the new coordinator.
   
   Overall, I agree that we could do better but I think that it is not the 
right time to change this. We are already under high time pressure and actually 
changing this in the right way puts even more pressure. We should look for a 
proper solution afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388499287


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   > if one client's log append is blocked for additional async check
   
   That is correct, it may become a perf problem, we can measure and see if 
it's worth fixing in practice, we'll have this choice (as well as the choice to 
postpone the fix, if we have time pressure to release).  But it won't be a 
functional problem.  Right now it is a functional problem, which is suboptimal 
in many ways:
   - appendRecords has async interface, thus adding async stages under such an 
interface can be done without inspection and understanding all callers (that's 
what an interface is -- any compliant implementation is valid), but doing so 
will break the current logic (so from the proper interface usage perspective it 
is a bug in the caller, which this proposal fixes)
   - we cannot release new transaction protocol (or new coordinator) without 
implementing new logic, which makes hard dependencies and pushes against 
timelines (now all of a sudden KIP-848 got a new work to do before release, 
just because there is some independent work is going on in transaction area)
   - KIP-890 part2 design is still under discussion, the verification protocol 
is likely to change, so any changes in KIP-890 protocol are going to have 
ripple effects on KIP-848
   - 2 fairly complex components are now tied together -- we cannot just 
innovate on transaction protocol implementation details (or to be broader -- on 
the whole IO subsystem implementation details -- e.g. Async IO) without 
understanding group coordinator implementation detail and we cannot innovate on 
group coordinator implementation detail without understanding implementation 
details of transaction protocol
   - to make the previous point worse, the dependency is not visible at the 
"point of use" -- someone tasked with improving transaction protocol (or IO in 
general) would have no indication from the appendRecords interface, that adding 
an async stage would need to have a corresponding change in group coordinator
   - the work needs to be duplicated in group coordinator (and the protocol is 
going slightly different for different client versions) which becomes a likely 
source of bugs
   
   IMO, the fact that transaction verification implementation just doesn't work 
out-of-box with the new group coordinator (and in fact requires quite 
non-trivial follow-up work that will block the release) is an architectural 
issue.  We should strive to make the system more decoupled, so that the context 
an engineer needs to understand to make local changes in a part of system is 
less.
   
   > Each new group coordinator thread handles requests from multiple groups 
and multiple clients within the same group.
   
   I don't think it's bound to a thread, but indeed the concurrency is limited 
to partition -- we don't let operations on the same partition run concurrently, 
so all the groups that are mapped to the same partition are contending.  This 
is, however, a specific implementation choice, it should be possible to make a 
group to be a unit of concurrency, and if that's not enough, we can let offset 
commits for different partitions go concurrently as well (they just need to 
make sure that group doesn't change, which is sort of a "read lock" on the 
group), at which point there probably wouldn't be any contention in the common 
path.
   
   Now, one might ask a question, implementing per-group synchronization adds 
complexity and handling transaction verification as an explicit state 
transition in group coordinator adds complexity, what the difference?  I'd say 
the difference is fundamental -- per-group synchronization complexity is 
encapsulated in one component and keeps the system decoupled: an engineer 
tasked to improve transaction protocol, doesn't need understand implementation 
details of group coordinator and vice versa.  Changes are smaller, can be made 
faster, and less bug prone.  Win-win-win.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-08 Thread via GitHub


junrao commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1387227928


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Thanks for the explanation, Artem. Yes, it's true that the new group 
coordinator only depends on acks=1.
   
   Each new group coordinator thread handles requests from multiple groups and 
multiple clients within the same group. In the proposed approach, if one 
client's log append is blocked for additional async check, it blocks the 
processing of other clients and other groups. So, it still seems to reduce the 
overall throughput somewhat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-07 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1385493045


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   We actually don't need to wait for replication, so the current pipelining 
works without changes -- the current logic uses acks=1 and captures the offset 
and then waits for HWM to be advanced to complete the write request.  It may 
prevent potential pipelining opportunities if new async stages are added for 
acks=1 (e.g. transaction verification).  But the most important thing is that 
with this proposal, innovating under appendRecords interface would just work 
out of box, which is the purpose of having interfaces -- innovating under the 
interface doesn't break callers that use interface correctly (which makes 
system modular). 
   
   If we find out that we want the pipelining for transaction verification we 
can make this optimization later (if we find it to be a problem).  We will have 
a choice between complexity and potentially better pipelining; with the current 
model, we don't have the choice -- the workflow will break if we add an async 
state to acks=1 processing and will have to fix it before shipping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-07 Thread via GitHub


junrao commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1385389618


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Hmm, while this guarantees ordering, it disables pipelining and thus 
potentially reduces the throughput, since we have to wait for each event's 
records to be fully replicated before processing the next event. 
   
   We probably could introduce a different callback in 
`ReplicaManager.appendRecords` that's invoked when the records are appended to 
the local log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-06 Thread via GitHub


artemlivshits opened a new pull request, #14705:
URL: https://github.com/apache/kafka/pull/14705

   This change fixes a broken abstraction where event execution relies on 
specific implementation detail of the ReplicaManager.appendRecords that with 
some arguments it is completed synchronously even though the interface is 
clearly asynchronous.  This assumption can be broken by changing 
implementation, as shown by KIP-890 work that added transaction verification 
stage that may result in asynchronous completion (which should be perfectly 
fine because the function interface is asynchronous and must be used as such) 
and violate the assumption of event execution.
   
   Now the event execution supports asynchronous completion and can properly 
handle asynchronous completion of the underlying functionality.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org