cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1163295217
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -284,6 +334,120 @@ FinalizedControllerFeatures finalizedFeatures(long epoch) { return new FinalizedControllerFeatures(features, epoch); } + + /** + * Optionally provides a ZkMigrationStateRecord to bootstrap into the metadata log. In the case of + * upgrades, the log will not be empty and this will return a NONE state record. For an empty log, + * this will return either a NONE or PRE_MIGRATION depending on the configuration and metadata.version. + * <p> + * If the log is in PRE_MIGRATION, this will throw an error. + * + * @param metadataVersion The current MetadataVersion of the log + * @param isMetadataLogEmpty True if the log is being initialized from empty + * @param recordConsumer A consumer for the ZkMigrationStateRecord + */ + public void generateZkMigrationRecord( + MetadataVersion metadataVersion, + boolean isMetadataLogEmpty, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + if (!metadataVersion.isMigrationSupported()) { + return; + } + + if (isMetadataLogEmpty) { + // Initialize the log with a ZkMigrationState + if (zkMigrationEnabled) { + log.info("Writing ZkMigrationState of PRE_MIGRATION to the log, since migrations are enabled."); + recordConsumer.accept(buildRecord(ZkMigrationState.PRE_MIGRATION)); + } else { + log.debug("Writing ZkMigrationState of NONE to the log, since migrations are not enabled."); + recordConsumer.accept(buildRecord(ZkMigrationState.NONE)); + } + } else { + // non-empty log + String prefix = "During metadata log initialization,"; + ZkMigrationControlState state = migrationControlState.get(); + switch (state.zkMigrationState()) { + case UNINITIALIZED: + // No ZkMigrationState record has been seen yet + log.debug("{} did not read any ZkMigrationState. Writing a ZkMigrationState of NONE to the log to " + + "indicate this cluster was not migrated from ZK.", prefix); + if (zkMigrationEnabled) { + log.error("Should not have ZK migrations enabled on a cluster that was created in KRaft mode"); + } + recordConsumer.accept(buildRecord(ZkMigrationState.NONE)); + break; + case NONE: + // This is a non-migrated KRaft cluster + log.debug("{} read a ZkMigrationState of NONE indicating this cluster was never migrated from ZK.", prefix); + if (zkMigrationEnabled) { + log.error("Should not have ZK migrations enabled on a cluster that was created in KRaft mode"); + } + break; + case POST_MIGRATION: + // This is a migrated KRaft cluster + log.debug("{} read a ZkMigrationState of POST_MIGRATION indicating this cluster was previously migrated from ZK.", prefix); + break; + case MIGRATION: + // This cluster is migrated, but still in dual-write mode + if (zkMigrationEnabled) { + log.debug("{} read a ZkMigrationState of MIGRATION indicating this cluster is being migrated from ZK.", prefix); + } else { + throw new IllegalStateException( + prefix + " read a ZkMigrationState of MIGRATION indicating this cluster is being migrated " + + "from ZK, but the controller does not have migrations enabled." + ); + } + break; + case PRE_MIGRATION: + if (!state.preMigrationSupported()) { + // Upgrade case from 3.4. The controller only wrote PRE_MIGRATION during migrations in that version, + // so this needs to complete that migration. + log.info("{} read a ZkMigrationState of PRE_MIGRATION from a ZK migration on Apache Kafka 3.4.", prefix); + if (zkMigrationEnabled) { + recordConsumer.accept(buildRecord(ZkMigrationState.MIGRATION)); + log.info("Writing ZkMigrationState of MIGRATION since migration mode is still active on the controller."); + } else { + recordConsumer.accept(buildRecord(ZkMigrationState.MIGRATION)); + recordConsumer.accept(buildRecord(ZkMigrationState.POST_MIGRATION)); + log.info("Writing ZkMigrationState of POST_MIGRATION since migration mode is not active on the controller."); + } + } else { + log.error("{} read a ZkMigrationState of PRE_MIGRATION indicating this cluster failed during a ZK migration.", prefix); + throw new IllegalStateException("Detected an invalid migration state during startup, cannot continue."); + } + break; + default: + throw new IllegalStateException("Unsupported migration state " + state.zkMigrationState()); + } + } + } + + /** + * Tests if the controller should be preventing metadata updates due to being in the PRE_MIGRATION + * state. If the controller does not yet support migrations (before 3.4-IV0), then metadata updates + * are allowed in any state. Once the controller has been upgraded to a version that supports + * migrations, then this method checks if the controller is in the PRE_MIGRATION state. + */ + boolean inPreMigrationMode(MetadataVersion metadataVersion) { + ZkMigrationControlState state = migrationControlState.get(); + if (metadataVersion.isMigrationSupported() && state.preMigrationSupported()) { + return state.zkMigrationState() == ZkMigrationState.PRE_MIGRATION; + } else { + return false; + } + } + + private ApiMessageAndVersion buildRecord(ZkMigrationState state) { Review Comment: This should be in `ZkMigrationState.java`, along with a method to extract the enum from the record. Then this can be used by `FeaturesImage` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org