cmccabe commented on code in PR #13407: URL: https://github.com/apache/kafka/pull/13407#discussion_r1173080760
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1123,29 +1165,104 @@ private void claim(int epoch) { // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. - queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]", - new CompleteActivationEvent())); + ControllerWriteEvent<Void> activationEvent = new ControllerWriteEvent<>("completeActivation[" + epoch + "]", + new CompleteActivationEvent(), + EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); + activationEvent.future.whenComplete((__, t) -> { + if (t != null) { + fatalFaultHandler.handleFault("exception while activating controller", t); + } + }); + queue.prepend(activationEvent); } catch (Throwable e) { fatalFaultHandler.handleFault("exception while claiming leadership", e); } } + /** + * Generate the set of activation records. Until KIP-868 transactions are supported, these records + * are committed to the log as an atomic batch. The records will include the bootstrap metadata records + * (including the bootstrap "metadata.version") and may include a ZK migration record. + */ + public static void generateActivationRecords( + Logger log, + boolean isLogEmpty, + boolean zkMigrationEnabled, + BootstrapMetadata bootstrapMetadata, + FeatureControlManager featureControl, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + if (isLogEmpty) { + // If no records have been replayed, we need to write out the bootstrap records. + // This will include the new metadata.version, as well as things like SCRAM + // initialization, etc. + log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + + "at metadata.version {} from {}.", bootstrapMetadata.records().size(), + bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()); + bootstrapMetadata.records().forEach(recordConsumer); + + if (bootstrapMetadata.metadataVersion().isMigrationSupported()) { + if (zkMigrationEnabled) { + log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " + + "the ZK metadata has been migrated"); + recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord()); + } else { + log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); + recordConsumer.accept(ZkMigrationState.NONE.toRecord()); + } + } else { + if (zkMigrationEnabled) { + throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + + " does not support ZK migrations. Cannot continue with ZK migrations enabled."); + } + } + } else { + // Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions + if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { + log.info("No metadata.version feature level record was found in the log. " + + "Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION); + } + + if (featureControl.metadataVersion().isMigrationSupported()) { + log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState()); + switch (featureControl.zkMigrationState()) { + case NONE: + // Since this is the default state there may or may not be an actual NONE in the log. Regardless, + // it will eventually be persisted in a snapshot, so we don't need to explicitly write it here. + if (zkMigrationEnabled) { + throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode."); + } + break; + case PRE_MIGRATION: Review Comment: (So in other words, I think this is worthy of a `warn` level log, but not an exception) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org