kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2487783869


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1021,22 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        if (bootstrapMetadata == null) {
+                            if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+                                // For bootstrap snapshots, extract feature 
levels from all data records
+                                if (batch.controlRecords().isEmpty()) {
+                                    System.out.println("DEBUG: Extracting 
bootstrap metadata from " + messages.size() + " records");
+                                    bootstrapMetadata = 
BootstrapMetadata.fromRecords(messages, "bootstrap");
+                                    System.out.println("DEBUG: Bootstrap 
metadata extracted: " + bootstrapMetadata);
+                                }

Review Comment:
   This is the wrong if condition. We should check `!batch.records.isEmpty`.



##########
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##########
@@ -138,8 +140,54 @@ static ControllerResult<Void> recordsForNonEmptyLog(
             }
         }
 
+        // Write bootstrap records to the log so brokers can read them, but 
only if not handling a partial transaction
+        // Brokers can't read snapshots, only log entries
+        boolean shouldWriteBootstrapRecords = (transactionStartOffset == -1L); 
       
+        if (shouldWriteBootstrapRecords) {
+            logMessageBuilder
+                .append("Writing bootstrap records to log for broker 
consumption. ")
+                .append("Appending ")
+                .append(bootstrapMetadata.records().size())
+                .append(" bootstrap record(s) ");
+            
+            if (curMetadataVersion.isMetadataTransactionSupported()) {
+                records.add(new ApiMessageAndVersion(
+                    new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+                logMessageBuilder.append("in metadata transaction ");
+            }
+            logMessageBuilder
+                .append("at metadata.version ")
+                .append(curMetadataVersion)
+                .append(" from bootstrap source '")
+                .append(bootstrapMetadata.source())
+                .append("'. ");
+
+            // Add bootstrap records
+            records.addAll(bootstrapMetadata.records());
+
+            // If ELR is enabled, we need to set a cluster-level 
min.insync.replicas.
+            if 
(bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) > 
0) {
+                records.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(BROKER.id()).
+                    setResourceName("").
+                    setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+                    setValue(Integer.toString(defaultMinInSyncReplicas)), 
(short) 0));
+            }
+
+            if (curMetadataVersion.isMetadataTransactionSupported()) {
+                records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+            }
+        }
+

Review Comment:
   Why are we changing this code?
   
   We shouldn't change this code though, because when the log is non-empty, it 
means the bootstrap metadata records have already been written in the log 
before.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1139,6 +1153,10 @@ class CompleteActivationEvent implements 
ControllerWriteOperation<Void> {
         @Override
         public ControllerResult<Void> generateRecordsAndResult() {
             try {
+                if (bootstrapMetadata == null) {
+                    throw new IllegalStateException("Bootstrap metadata not 
available during activation. " +
+                        "This should not happen if a bootstrap snapshot was 
processed.");
+                }

Review Comment:
   We should allow bootstrapMetadata to be null here. However, it cannot be 
null when we call `recordsForEmptyLog`. It can be null when we call 
`recordsForNonEmptyLog`



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1021,22 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        if (bootstrapMetadata == null) {
+                            if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+                                // For bootstrap snapshots, extract feature 
levels from all data records
+                                if (batch.controlRecords().isEmpty()) {
+                                    System.out.println("DEBUG: Extracting 
bootstrap metadata from " + messages.size() + " records");
+                                    bootstrapMetadata = 
BootstrapMetadata.fromRecords(messages, "bootstrap");
+                                    System.out.println("DEBUG: Bootstrap 
metadata extracted: " + bootstrapMetadata);
+                                }
+                            } else {
+                                Map<String, Short> featureVersions = new 
HashMap<>();
+                                MetadataVersion metadataVersion = 
MetadataVersion.latestProduction();
+                                
featureVersions.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
+                                featureVersions.put(KRaftVersion.FEATURE_NAME, 
raftClient.kraftVersion().featureLevel());
+                                bootstrapMetadata = 
BootstrapMetadata.fromVersions(metadataVersion, featureVersions, "generated 
default");

Review Comment:
   If we're not reading the 0-0.checkpoint, `bootstrapMetadata` is either:
   
   1. read from `bootstrap.checkpoint` and passed down here
   2. null, because it should have already been written to the log as part of 
the 0-0.checkpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to