cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1173080760


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
             // Prepend the activate event. It is important that this event go 
at the beginning
             // of the queue rather than the end (hence prepend rather than 
append). It's also
             // important not to use prepend for anything else, to preserve the 
ordering here.
-            queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-                    new CompleteActivationEvent()));
+            ControllerWriteEvent<Void> activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+                    new CompleteActivationEvent(),
+                    EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+            activationEvent.future.whenComplete((__, t) -> {
+                if (t != null) {
+                    fatalFaultHandler.handleFault("exception while activating 
controller", t);
+                }
+            });
+            queue.prepend(activationEvent);
         } catch (Throwable e) {
             fatalFaultHandler.handleFault("exception while claiming 
leadership", e);
         }
     }
 
+    /**
+     * Generate the set of activation records. Until KIP-868 transactions are 
supported, these records
+     * are committed to the log as an atomic batch. The records will include 
the bootstrap metadata records
+     * (including the bootstrap "metadata.version") and may include a ZK 
migration record.
+     */
+    public static void generateActivationRecords(
+        Logger log,
+        boolean isLogEmpty,
+        boolean zkMigrationEnabled,
+        BootstrapMetadata bootstrapMetadata,
+        FeatureControlManager featureControl,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        if (isLogEmpty) {
+            // If no records have been replayed, we need to write out the 
bootstrap records.
+            // This will include the new metadata.version, as well as things 
like SCRAM
+            // initialization, etc.
+            log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
+                "at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
+                bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+            bootstrapMetadata.records().forEach(recordConsumer);
+
+            if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+                if (zkMigrationEnabled) {
+                    log.info("Putting the controller into pre-migration mode. 
No metadata updates will be allowed until " +
+                        "the ZK metadata has been migrated");
+                    
recordConsumer.accept(ZkMigrationState.PRE_MIGRATION.toRecord());
+                } else {
+                    log.debug("Setting the ZK migration state to NONE since 
this is a de-novo KRaft cluster.");
+                    recordConsumer.accept(ZkMigrationState.NONE.toRecord());
+                }
+            } else {
+                if (zkMigrationEnabled) {
+                    throw new RuntimeException("The bootstrap metadata.version 
" + bootstrapMetadata.metadataVersion() +
+                        " does not support ZK migrations. Cannot continue with 
ZK migrations enabled.");
+                }
+            }
+        } else {
+            // Logs have been replayed. We need to initialize some things here 
if upgrading from older KRaft versions
+            if 
(featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION))
 {
+                log.info("No metadata.version feature level record was found 
in the log. " +
+                    "Treating the log as version {}.", 
MetadataVersion.MINIMUM_KRAFT_VERSION);
+            }
+
+            if (featureControl.metadataVersion().isMigrationSupported()) {
+                log.info("Loaded ZK migration state of {}", 
featureControl.zkMigrationState());
+                switch (featureControl.zkMigrationState()) {
+                    case NONE:
+                        // Since this is the default state there may or may 
not be an actual NONE in the log. Regardless,
+                        // it will eventually be persisted in a snapshot, so 
we don't need to explicitly write it here.
+                        if (zkMigrationEnabled) {
+                            throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was created in KRaft mode.");
+                        }
+                        break;
+                    case PRE_MIGRATION:

Review Comment:
   (So in other words, I think this is worthy of a `warn` level log, but not an 
exception)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to