[PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon opened a new pull request, #15732: URL: https://github.com/apache/kafka/pull/15732 When running ZK migrating to KRaft process, we encountered an issue that the migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` state. And it is because the pollEvent didn't retry with the retriable `MigrationClientException` (ZK client retriable errors) while it should. This PR fixes it and add test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1582700986 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,31 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: Yes, this is where the uncaught exception thrown. The exception will be handled by its parent `MigrationEvent#handleException`, and we'll log error there, and even call the `faultHandler.handleFault` to handle fatal errors. https://github.com/apache/kafka/blob/994077e43e2415261bd17443568723f35c3e29d9/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java#L396-L404 Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2082153705 @akhileshchg , thanks for the review. We still have a question to you: > However, I'm still not sure why https://github.com/apache/kafka/pull/12998 did not use event to recover migration state from zk. Do we have any special reason for it? This change should be fine, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2082204486 @soarez @chia7712 , since the original author @akhileshchg had reviewed and approved, do you have any other comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon merged PR #15732: URL: https://github.com/apache/kafka/pull/15732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2082297410 Thanks all for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2058930123 @cmccabe @mumrah , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
soarez commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1567464913 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: Did you consider simply defining an empty `public void retryHandler(Throwable thrown)` that `PollEvent` can override? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1567853719 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: +1 to this style! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1567855363 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: Also, should we call `wakeup` (run next poll ASAP) rather that `scheduleDeferred` if the exception is retryable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568277518 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: > Did you consider simply defining an empty public void retryHandler(Throwable thrown) that PollEvent can override? Nice suggestion! Updated! > Also, should we call wakeup (run next poll ASAP) rather that scheduleDeferred if the exception is retryable? Thanks for the suggestion. I think that's not appropriate because if the retriable error needs some time to be fixed (ex: the ZK connection issue), the pollEvent will be invoked a lot of times (and keep retrying) in a short period of time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568308820 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: I feel the `retry` is existent except for `UNINITIALIZED` since `UNINITIALIZED` is not running by another event. For other event type, `PollEvent` will put (do-something event + one deferred `PollEvent`) to the queue. It means the deferred `PollEvent` is the "retry". My question is "why we did not handle `UNINITIALIZED` by another event"? If we move `recoverMigrationStateFromZK` to another event, we don't need to add extra `retryHandler`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568311272 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: Also, the solution offered by this PR has a side effect that we will put 2 `PollEvent` if the exception `MigrationClientException` happens in other `migrationState` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` won't happen in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` retry won't happen in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` retryHandler won't be triggered in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568327439 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > No, as you said above, the MigrationClientException retryHandler won't be triggered in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as pollEvent is keep polling, they can be retried later. you are right :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1570596642 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: @chia7712 , I take your suggestion to add `RecoverMigrationStateFromZKEvent` so that we don't need to worry about retry anymore. I was checking if this change will cause any side effect, and here is my finding: 1. `recoverMigrationStateFromZK` is expected to run before the driver starts the state machine. 2. In the `recoverMigrationStateFromZK`, we'll do these things: a. create a ZNode for migration and initial migration state b. install this class as a metadata publisher c. transition to INACTIVE state 3. If this `recoverMigrationStateFromZK` is keep failing, the log will keep outputting errors and keep retrying. Once it succeeds, the metadata publisher will be installed and the `onControllerChange` and `onMetadataUpdate` will be triggered to start the process. That means, if we change `recoverMigrationStateFromZK` into an event, it won't affect anything because what we need to do at this state is just waiting for the (a)(b)(c) operation completes. So, I'm +1 with this suggestion. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1570822868 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: Should we add condition check (`checkDriverState`) like other events? ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: Should we use `prepend` to make sure this event is executed ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1571963909 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: No need. Like I said in this [comment](https://github.com/apache/kafka/pull/15732#discussion_r1570596642), in the `UNINITIALIZED` state, the only event we will receive is the `pollEvent`. We'll receive additional`onControllerChange` (`KRaftLeaderEvent`) and `onMetadataUpdate` (`MetadataChangeEvent`) after completing `RecoverMigrationStateFromZKEvent`. So, we don't have to worry about the order at this moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1571981454 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: Good suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018955 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -881,10 +937,18 @@ public List> recordBatches() { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), -"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); +"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); Review Comment: Side fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572018203 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -881,10 +937,18 @@ public List> recordBatches() { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), -"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); +"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); assertEquals(expectedBatchCount, batchesPassedToController.size()); assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum()); } } + +// Wait until the driver has recovered MigrationState From ZK. This is to simulate the driver needs to be installed as the metadata publisher +// so that it can receive onControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) events. +private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException { +driver.start(); +TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE), +"Waiting for KRaftMigrationDriver to enter INACTIVE state"); Review Comment: This is necessary now because in the test suite, we might invoke `onControllerChange` to append `KRaftLeaderEvent` before the `RecoverMigrationStateFromZKEvent` is appended. This won't happen in practice because the driver needs to wait until `RecoverMigrationStateFromZKEvent` completed to register metadata publisher to receive `KRaftLeaderEvent` and `MetadataChangeEvent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
soarez commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572550218 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: Are we allowing a race between the `RecoverMigrationStateFromZKEvent` and the next `PollEvent` scheduled after the switch? Maybe this could be more straightforward if we only schedule the next poll once `RecoverMigrationStateFromZKEvent` finishes, either normally or exceptionally? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
soarez commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1572557807 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: On second thought, I don't think my question makes sense. The following `PollEvent` can only after `RecoverMigrationStateFromZKEvent` finishes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2068407055 @akhileshchg @mumrah @cmccabe , could you take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2076748016 @akhileshchg @mumrah @cmccabe , we need your comment on this. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
akhileshchg commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1580260431 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,31 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: For my understanding, was this the line where uncaught exception is thrown? Can we handle the exception more gracefully and log and error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org