junrao commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1372393891


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+    private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+    private final String location;
+    private final BufferedReader reader;
+
+    public PartitionMetadataReadBuffer(
+        String location,
+        BufferedReader reader
+    ) {
+        this.location = location;
+        this.reader = reader;
+    }
+
+    PartitionMetadata read() throws IOException {
+        String line = null;
+        Uuid metadataTopicId;
+
+        try {
+            line = reader.readLine();
+            String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+            if (versionArr.length == 2) {
+                int version = Integer.parseInt(versionArr[1]);
+                if (version == PartitionMetadataFile.CURRENT_VERSION) {

Review Comment:
   This is an existing issue. In the future, we may add a new field and bump up 
the version, to make it possible to downgrade, it's probably better to relax 
this check a bit.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadata.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;

Review Comment:
   It seems that this should be in the 
`org.apache.kafka.storage.internals.checkpoint` package?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+    private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+    private final String location;
+    private final BufferedReader reader;
+
+    public PartitionMetadataReadBuffer(
+        String location,
+        BufferedReader reader
+    ) {
+        this.location = location;
+        this.reader = reader;
+    }
+
+    PartitionMetadata read() throws IOException {
+        String line = null;
+        Uuid metadataTopicId;
+
+        try {
+            line = reader.readLine();
+            String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+            if (versionArr.length == 2) {
+                int version = Integer.parseInt(versionArr[1]);
+                if (version == PartitionMetadataFile.CURRENT_VERSION) {
+                    line = reader.readLine();
+                    String[] topicIdArr = WHITE_SPACES_PATTERN.split(line);
+
+                    if (topicIdArr.length == 2) {
+                        metadataTopicId = Uuid.fromString(topicIdArr[1]);
+
+                        if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+                            throw new IOException("Invalid topic ID in 
partition metadata file (" + location + ")");
+                        }
+
+                        return new PartitionMetadata(version, metadataTopicId);
+                    } else {
+                        throw malformedLineException(line);
+                    }
+                } else {
+                    throw new IOException("Unrecognized version of partition 
metadata file + (" + location + "): " + version);
+                }
+            } else {
+                throw malformedLineException(line);
+            }
+
+        } catch (IOException | NumberFormatException e) {
+            throw malformedLineException(line);
+        }
+    }
+
+    private IOException malformedLineException(String line) throws IOException 
{
+        throw new IOException("Malformed line in checkpoint file " + location 
+ ": " + line);

Review Comment:
   We are throwing both here and in the caller.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataFile.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+    private static final String PARTITION_METADATA_FILE_NAME = 
"partition.metadata";
+    static final int CURRENT_VERSION = 0;
+
+    public static File newFile(File dir) {
+        return new File(dir, PARTITION_METADATA_FILE_NAME);
+    }
+
+    private final File file;
+    private final LogDirFailureChannel logDirFailureChannel;
+
+    private final Object lock = new Object();
+    private volatile Optional<Uuid> dirtyTopicIdOpt = null;
+
+    public PartitionMetadataFile(
+        final File file,
+        final LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.file = file;
+        this.logDirFailureChannel = logDirFailureChannel;
+    }
+
+    /**
+     * Records the topic ID that will be flushed to disk.
+     */
+    public void record(Uuid topicId) {
+        // Topic IDs should not differ, but we defensively check here to fail 
earlier in the case that the IDs somehow differ.
+        if (dirtyTopicIdOpt != null && dirtyTopicIdOpt.isPresent()) {
+            dirtyTopicIdOpt.ifPresent(dirtyTopicId -> {
+                if (dirtyTopicId != topicId) {
+                    throw new InconsistentTopicIdException("Tried to record 
topic ID $topicId to file " +
+                        "but had already recorded $dirtyTopicId");
+                }
+            });
+        }
+        dirtyTopicIdOpt = Optional.of(topicId);
+    }
+
+    public void maybeFlush() {
+        // We check dirtyTopicId first to avoid having to take the lock 
unnecessarily in the frequently called log append path
+        if (dirtyTopicIdOpt != null && dirtyTopicIdOpt.isPresent()) {
+            // We synchronize on the actual write to disk
+            synchronized (lock) {
+                dirtyTopicIdOpt.ifPresent(topicId -> {
+                    try {
+                        FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath().toFile());

Review Comment:
   This is an existing issue. It seems that we need to close fileOutputStream 
too?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataFile.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+    private static final String PARTITION_METADATA_FILE_NAME = 
"partition.metadata";
+    static final int CURRENT_VERSION = 0;
+
+    public static File newFile(File dir) {
+        return new File(dir, PARTITION_METADATA_FILE_NAME);
+    }
+
+    private final File file;
+    private final LogDirFailureChannel logDirFailureChannel;
+
+    private final Object lock = new Object();
+    private volatile Optional<Uuid> dirtyTopicIdOpt = null;

Review Comment:
   This should initialize to `Optional.empty()`? Then, we could get rid of the 
null check below.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadata.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+
+    public int version;
+    public Uuid topicId;
+
+    public PartitionMetadata(int version, Uuid topicId) {

Review Comment:
   Could we add a `toText()` method here and get rid of 
`PartitionMetadataFileFormatter`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+    private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+    private final String location;
+    private final BufferedReader reader;
+
+    public PartitionMetadataReadBuffer(
+        String location,
+        BufferedReader reader
+    ) {
+        this.location = location;
+        this.reader = reader;
+    }
+
+    PartitionMetadata read() throws IOException {
+        String line = null;
+        Uuid metadataTopicId;
+
+        try {
+            line = reader.readLine();
+            String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+            if (versionArr.length == 2) {
+                int version = Integer.parseInt(versionArr[1]);
+                if (version == PartitionMetadataFile.CURRENT_VERSION) {
+                    line = reader.readLine();
+                    String[] topicIdArr = WHITE_SPACES_PATTERN.split(line);
+
+                    if (topicIdArr.length == 2) {
+                        metadataTopicId = Uuid.fromString(topicIdArr[1]);
+
+                        if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+                            throw new IOException("Invalid topic ID in 
partition metadata file (" + location + ")");
+                        }
+
+                        return new PartitionMetadata(version, metadataTopicId);
+                    } else {
+                        throw malformedLineException(line);
+                    }
+                } else {
+                    throw new IOException("Unrecognized version of partition 
metadata file + (" + location + "): " + version);
+                }
+            } else {
+                throw malformedLineException(line);
+            }
+
+        } catch (IOException | NumberFormatException e) {

Review Comment:
   `IOException` was thrown explicitly and there is no need to catch it again 
and wrap it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to