cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1163295217


##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -284,6 +334,120 @@ FinalizedControllerFeatures finalizedFeatures(long epoch) 
{
         return new FinalizedControllerFeatures(features, epoch);
     }
 
+
+    /**
+     * Optionally provides a ZkMigrationStateRecord to bootstrap into the 
metadata log. In the case of
+     * upgrades, the log will not be empty and this will return a NONE state 
record. For an empty log,
+     * this will return either a NONE or PRE_MIGRATION depending on the 
configuration and metadata.version.
+     * <p>
+     * If the log is in PRE_MIGRATION, this will throw an error.
+     *
+     * @param metadataVersion       The current MetadataVersion of the log
+     * @param isMetadataLogEmpty    True if the log is being initialized from 
empty
+     * @param recordConsumer        A consumer for the ZkMigrationStateRecord
+     */
+    public void generateZkMigrationRecord(
+        MetadataVersion metadataVersion,
+        boolean isMetadataLogEmpty,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        if (!metadataVersion.isMigrationSupported()) {
+            return;
+        }
+
+        if (isMetadataLogEmpty) {
+            // Initialize the log with a ZkMigrationState
+            if (zkMigrationEnabled) {
+                log.info("Writing ZkMigrationState of PRE_MIGRATION to the 
log, since migrations are enabled.");
+                
recordConsumer.accept(buildRecord(ZkMigrationState.PRE_MIGRATION));
+            } else {
+                log.debug("Writing ZkMigrationState of NONE to the log, since 
migrations are not enabled.");
+                recordConsumer.accept(buildRecord(ZkMigrationState.NONE));
+            }
+        } else {
+            // non-empty log
+            String prefix = "During metadata log initialization,";
+            ZkMigrationControlState state = migrationControlState.get();
+            switch (state.zkMigrationState()) {
+                case UNINITIALIZED:
+                    // No ZkMigrationState record has been seen yet
+                    log.debug("{} did not read any ZkMigrationState. Writing a 
ZkMigrationState of NONE to the log to " +
+                            "indicate this cluster was not migrated from ZK.", 
prefix);
+                    if (zkMigrationEnabled) {
+                        log.error("Should not have ZK migrations enabled on a 
cluster that was created in KRaft mode");
+                    }
+                    recordConsumer.accept(buildRecord(ZkMigrationState.NONE));
+                    break;
+                case NONE:
+                    // This is a non-migrated KRaft cluster
+                    log.debug("{} read a ZkMigrationState of NONE indicating 
this cluster was never migrated from ZK.", prefix);
+                    if (zkMigrationEnabled) {
+                        log.error("Should not have ZK migrations enabled on a 
cluster that was created in KRaft mode");
+                    }
+                    break;
+                case POST_MIGRATION:
+                    // This is a migrated KRaft cluster
+                    log.debug("{} read a ZkMigrationState of POST_MIGRATION 
indicating this cluster was previously migrated from ZK.", prefix);
+                    break;
+                case MIGRATION:
+                    // This cluster is migrated, but still in dual-write mode
+                    if (zkMigrationEnabled) {
+                        log.debug("{} read a ZkMigrationState of MIGRATION 
indicating this cluster is being migrated from ZK.", prefix);
+                    } else {
+                        throw new IllegalStateException(
+                                prefix + " read a ZkMigrationState of 
MIGRATION indicating this cluster is being migrated " +
+                                        "from ZK, but the controller does not 
have migrations enabled."
+                        );
+                    }
+                    break;
+                case PRE_MIGRATION:
+                    if (!state.preMigrationSupported()) {
+                        // Upgrade case from 3.4. The controller only wrote 
PRE_MIGRATION during migrations in that version,
+                        // so this needs to complete that migration.
+                        log.info("{} read a ZkMigrationState of PRE_MIGRATION 
from a ZK migration on Apache Kafka 3.4.", prefix);
+                        if (zkMigrationEnabled) {
+                            
recordConsumer.accept(buildRecord(ZkMigrationState.MIGRATION));
+                            log.info("Writing ZkMigrationState of MIGRATION 
since migration mode is still active on the controller.");
+                        } else {
+                            
recordConsumer.accept(buildRecord(ZkMigrationState.MIGRATION));
+                            
recordConsumer.accept(buildRecord(ZkMigrationState.POST_MIGRATION));
+                            log.info("Writing ZkMigrationState of 
POST_MIGRATION since migration mode is not active on the controller.");
+                        }
+                    } else {
+                        log.error("{} read a ZkMigrationState of PRE_MIGRATION 
indicating this cluster failed during a ZK migration.", prefix);
+                        throw new IllegalStateException("Detected an invalid 
migration state during startup, cannot continue.");
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException("Unsupported migration 
state " + state.zkMigrationState());
+            }
+        }
+    }
+
+    /**
+     * Tests if the controller should be preventing metadata updates due to 
being in the PRE_MIGRATION
+     * state. If the controller does not yet support migrations (before 
3.4-IV0), then metadata updates
+     * are allowed in any state. Once the controller has been upgraded to a 
version that supports
+     * migrations, then this method checks if the controller is in the 
PRE_MIGRATION state.
+     */
+    boolean inPreMigrationMode(MetadataVersion metadataVersion) {
+        ZkMigrationControlState state = migrationControlState.get();
+        if (metadataVersion.isMigrationSupported() && 
state.preMigrationSupported()) {
+            return state.zkMigrationState() == ZkMigrationState.PRE_MIGRATION;
+        } else {
+            return false;
+        }
+    }
+
+    private ApiMessageAndVersion buildRecord(ZkMigrationState state) {

Review Comment:
   This should be in `ZkMigrationState.java`, along with a method to extract 
the enum from the record.
   
   Then this can be used by `FeaturesImage` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to