kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2504612769
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,12 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ // For bootstrap snapshots, extract feature levels
from all data records
+ if (batch.controlRecords().isEmpty()) {
Review Comment:
Please fix this if condition. We should be checking `if
(!messages.isEmpty())` on L1025. We should not be doing anything else besides
setting `bootstrapMetadata` when loading a `0-0.checkpoint` that contains
metadata records. In the current code, you are still iterating through the
`messages` after setting `bootstrapMetadata`. This is why you need
`forceBootstrapWrite`, but that boolean should be removed.
##########
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##########
@@ -155,10 +155,11 @@ static ControllerResult<Void> generate(
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
+ boolean forceBootstrapWrite,
Optional<MetadataVersion> curMetadataVersion,
int defaultMinInSyncReplicas
) {
- if (curMetadataVersion.isEmpty()) {
+ if (forceBootstrapWrite || curMetadataVersion.isEmpty()) {
Review Comment:
Please remove this boolean. See my other comments.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -73,6 +76,28 @@ public BootstrapMetadata read() throws Exception {
}
}
+ public BootstrapMetadata read() throws Exception {
+ Path path = Paths.get(directoryPath);
+ if (!Files.isDirectory(path)) {
+ if (Files.exists(path)) {
+ throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
+ "a directory.");
+ } else {
+ throw new RuntimeException("No such directory as " +
directoryPath);
+ }
+ }
+ Path binaryBootstrapPath = Paths.get(directoryPath,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+ BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+ if (!Files.exists(binaryBootstrapPath)) {
+ return readFromConfiguration();
+ } else {
+ return readFromBinaryFile(binaryBootstrapPath.toString());
+ }
Review Comment:
We can remove this and just use the old `read()` implementation.
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,12 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ // For bootstrap snapshots, extract feature levels
from all data records
+ if (batch.controlRecords().isEmpty()) {
Review Comment:
```suggestion
// check if 0-0.checkpoint has data records
if (!messages.isEmpty()) {
bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap");
return;
}
```
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1139,10 +1142,14 @@ class CompleteActivationEvent implements
ControllerWriteOperation<Void> {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
+ boolean forceBootstrapWrite =
featureControl.metadataVersion().isPresent() &&
+ offsetControl.lastCommittedOffset() ==
Snapshots.BOOTSTRAP_SNAPSHOT_ID.offset() &&
+ offsetControl.lastCommittedEpoch() ==
Snapshots.BOOTSTRAP_SNAPSHOT_ID.epoch();
Review Comment:
Please remove `forceBootstrapWrite`. Instead, we should not be iterating
through the data records for the `0-0.checkpoint` in `handleLoadSnapshot`.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -42,6 +43,8 @@
public class BootstrapDirectory {
Review Comment:
We should not need to materially change this file at all (at least for now),
since this file only deals with the old `bootstrap.checkpoint` file that is
deprecated going forward.
It looks like it handles the `bootstrap.checkpoint` file not existing by
defaulting to a bootstrap record set of just the latest MV. This means
`bootstrapMetadata` is never `null`.
Maybe we want to rename the file to `LegacyBootstrapDirectory`, mark this as
deprecated, and log a message when we successfully read `bootstrap.checkpoint`.
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -384,8 +384,6 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long
uncleanLeaderElectio
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new IllegalStateException("You must set a raft client.");
- } else if (bootstrapMetadata == null) {
- throw new IllegalStateException("You must specify an initial
metadata.version using the kafka-storage tool.");
Review Comment:
We can keep this assertion because `bootstrapMetadata` is never null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]