kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2487783869
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1021,22 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ if (bootstrapMetadata == null) {
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ // For bootstrap snapshots, extract feature
levels from all data records
+ if (batch.controlRecords().isEmpty()) {
+ System.out.println("DEBUG: Extracting
bootstrap metadata from " + messages.size() + " records");
+ bootstrapMetadata =
BootstrapMetadata.fromRecords(messages, "bootstrap");
+ System.out.println("DEBUG: Bootstrap
metadata extracted: " + bootstrapMetadata);
+ }
Review Comment:
This is the wrong if condition. We should check `!batch.records.isEmpty`.
##########
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##########
@@ -138,8 +140,54 @@ static ControllerResult<Void> recordsForNonEmptyLog(
}
}
+ // Write bootstrap records to the log so brokers can read them, but
only if not handling a partial transaction
+ // Brokers can't read snapshots, only log entries
+ boolean shouldWriteBootstrapRecords = (transactionStartOffset == -1L);
+ if (shouldWriteBootstrapRecords) {
+ logMessageBuilder
+ .append("Writing bootstrap records to log for broker
consumption. ")
+ .append("Appending ")
+ .append(bootstrapMetadata.records().size())
+ .append(" bootstrap record(s) ");
+
+ if (curMetadataVersion.isMetadataTransactionSupported()) {
+ records.add(new ApiMessageAndVersion(
+ new BeginTransactionRecord().setName("Bootstrap records"),
(short) 0));
+ logMessageBuilder.append("in metadata transaction ");
+ }
+ logMessageBuilder
+ .append("at metadata.version ")
+ .append(curMetadataVersion)
+ .append(" from bootstrap source '")
+ .append(bootstrapMetadata.source())
+ .append("'. ");
+
+ // Add bootstrap records
+ records.addAll(bootstrapMetadata.records());
+
+ // If ELR is enabled, we need to set a cluster-level
min.insync.replicas.
+ if
(bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) >
0) {
+ records.add(new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(BROKER.id()).
+ setResourceName("").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue(Integer.toString(defaultMinInSyncReplicas)),
(short) 0));
+ }
+
+ if (curMetadataVersion.isMetadataTransactionSupported()) {
+ records.add(new ApiMessageAndVersion(new
EndTransactionRecord(), (short) 0));
+ }
+ }
+
Review Comment:
Why are we changing this code?
We shouldn't change this code though, because when the log is non-empty, it
means the bootstrap metadata records have already been written in the log
before.
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1139,6 +1153,10 @@ class CompleteActivationEvent implements
ControllerWriteOperation<Void> {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
+ if (bootstrapMetadata == null) {
+ throw new IllegalStateException("Bootstrap metadata not
available during activation. " +
+ "This should not happen if a bootstrap snapshot was
processed.");
+ }
Review Comment:
We should allow bootstrapMetadata to be null here. However, it cannot be
null when we call `recordsForEmptyLog`. It can be null when we call
`recordsForNonEmptyLog`
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1021,22 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ if (bootstrapMetadata == null) {
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ // For bootstrap snapshots, extract feature
levels from all data records
+ if (batch.controlRecords().isEmpty()) {
+ System.out.println("DEBUG: Extracting
bootstrap metadata from " + messages.size() + " records");
+ bootstrapMetadata =
BootstrapMetadata.fromRecords(messages, "bootstrap");
+ System.out.println("DEBUG: Bootstrap
metadata extracted: " + bootstrapMetadata);
+ }
+ } else {
+ Map<String, Short> featureVersions = new
HashMap<>();
+ MetadataVersion metadataVersion =
MetadataVersion.latestProduction();
+
featureVersions.put(MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel());
+ featureVersions.put(KRaftVersion.FEATURE_NAME,
raftClient.kraftVersion().featureLevel());
+ bootstrapMetadata =
BootstrapMetadata.fromVersions(metadataVersion, featureVersions, "generated
default");
Review Comment:
If we're not reading the 0-0.checkpoint, `bootstrapMetadata` is either:
1. read from `bootstrap.checkpoint` and passed down here
2. null, because it should have already been written to the log as part of
the 0-0.checkpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]