kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2578581374
##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java:
##########
@@ -69,25 +69,27 @@ public synchronized void close() throws Exception {
}
@Test
+ @SuppressWarnings("resource")
public void testReadFromEmptyConfiguration() throws Exception {
try (BootstrapTestDirectory testDirectory = new
BootstrapTestDirectory().createDirectory()) {
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
"the default bootstrap"),
- new BootstrapDirectory(testDirectory.path()).read());
+ new LegacyBootstrapDirectory(testDirectory.path()).read());
}
}
@Test
public void testMissingDirectory() {
assertEquals("No such directory as ./non/existent/directory",
assertThrows(RuntimeException.class, () ->
- new
BootstrapDirectory("./non/existent/directory").read()).getMessage());
+ new
LegacyBootstrapDirectory("./non/existent/directory").read()).getMessage());
}
@Test
+ @SuppressWarnings("resource")
public void testReadFromConfigurationFile() throws Exception {
Review Comment:
We can remove this test.
##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.metadata.util.BatchFileReader;
+import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
+import org.apache.kafka.metadata.util.BatchFileWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
+ private final String directoryPath;
+
+ public TestBootstrapDirectory(String directoryPath) {
+ this.directoryPath = Objects.requireNonNull(directoryPath);
+ }
+
+ @Override
+ public BootstrapMetadata read() throws Exception {
+ Path path = Paths.get(directoryPath);
+ if (!Files.isDirectory(path)) {
+ if (Files.exists(path)) {
+ throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
+ "a directory.");
+ } else {
+ throw new RuntimeException("No such directory as " +
directoryPath);
+ }
+ }
+ Path binaryBootstrapPath = Paths.get(directoryPath,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+ BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+ if (!Files.exists(binaryBootstrapPath)) {
+ throw new FileNotFoundException(binaryBootstrapPath.toString());
+ } else {
+ return readFromBinaryFile(binaryBootstrapPath.toString());
+ }
+ }
+
+ BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ try (BatchFileReader reader = new BatchFileReader.Builder().
+ setPath(binaryPath).build()) {
+ while (reader.hasNext()) {
+ BatchAndType batchAndType = reader.next();
+ if (!batchAndType.isControl()) {
+ records.addAll(batchAndType.batch().records());
+ }
+ }
+ }
+ return
BootstrapMetadata.fromRecords(Collections.unmodifiableList(records),
+ "the binary bootstrap metadata file: " + binaryPath);
+ }
+
+ @Override
+ public void writeBinaryFile(BootstrapMetadata bootstrapMetadata) throws
IOException {
Review Comment:
We should remove this method entirely from the code + the
`BootstrapDirectory` interface, since we do not write `bootstrap.checkpoint`
anymore, and this method is not how we write `0-0.checkpoint`.
##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.metadata.util.BatchFileReader;
+import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
+import org.apache.kafka.metadata.util.BatchFileWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
+ private final String directoryPath;
+
+ public TestBootstrapDirectory(String directoryPath) {
+ this.directoryPath = Objects.requireNonNull(directoryPath);
+ }
+
+ @Override
+ public BootstrapMetadata read() throws Exception {
+ Path path = Paths.get(directoryPath);
+ if (!Files.isDirectory(path)) {
+ if (Files.exists(path)) {
+ throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
+ "a directory.");
+ } else {
+ throw new RuntimeException("No such directory as " +
directoryPath);
+ }
+ }
+ Path binaryBootstrapPath = Paths.get(directoryPath,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+ BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+ if (!Files.exists(binaryBootstrapPath)) {
+ throw new FileNotFoundException(binaryBootstrapPath.toString());
+ } else {
+ return readFromBinaryFile(binaryBootstrapPath.toString());
+ }
+ }
+
+ BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ try (BatchFileReader reader = new BatchFileReader.Builder().
+ setPath(binaryPath).build()) {
+ while (reader.hasNext()) {
+ BatchAndType batchAndType = reader.next();
+ if (!batchAndType.isControl()) {
+ records.addAll(batchAndType.batch().records());
+ }
+ }
+ }
+ return
BootstrapMetadata.fromRecords(Collections.unmodifiableList(records),
+ "the binary bootstrap metadata file: " + binaryPath);
+ }
Review Comment:
I think this method is shared across both implementations right? We can keep
it at the interface level then.
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -432,24 +437,23 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws
Exception {
setDirectoryId(directoryId).
build());
}
+ copier.setWriteErrorHandler((errorLogDir, e) -> {
+ throw new FormatterException("Error while writing
meta.properties file " +
+ errorLogDir + ": " + e);
+ });
Review Comment:
See https://github.com/apache/kafka/pull/20707#discussion_r2578454870. I am
pretty sure setting this handler first doesn't do anything but maybe I am wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]