kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2538749900
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1022,10 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ // KIP-1170: The 0-0.checkpoint can contain metadata
records. If it does, they should be considered the bootstrap metadata for the
cluster.
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) &&
!messages.isEmpty()) {
+ bootstrapMetadata =
BootstrapMetadata.fromRecords(messages, "bootstrap");
+ }
Review Comment:
This is incorrect. Please follow my suggestion from
https://github.com/apache/kafka/pull/20707#discussion_r2511539025. This code
still iterates through `messages` when loading a `0-0.checkpoint` with metadata
records.
##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
- for (ApiMessageAndVersion record : batch.records()) {
- try {
- delta.replay(record.message());
- } catch (Throwable e) {
- faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
- " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) &&
!batch.records().isEmpty()) {
+ for (ApiMessageAndVersion record : batch.records()) {
+ try {
+ delta.replay(record.message());
+ } catch (Throwable e) {
+ faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
+ " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ }
+ snapshotIndex++;
Review Comment:
This is incorrect. This code replays records ONLY if the snapshot being
loaded is the `0-0.checkpoint` with metadata records. The code should skip over
replaying records in that case, not replay them. For all other checkpoints, the
code can remain as is.
```suggestion
if
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
for (ApiMessageAndVersion record : batch.records()) {
try {
delta.replay(record.message());
} catch (Throwable e) {
faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
" in snapshot at offset " +
reader.lastContainedLogOffset(), e);
}
snapshotIndex++;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]