satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r722958575



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // File format:
+    // <header>[<entry>...]
+    // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
+    // entry: <entry-length><entry-bytes>
+
+    // header size: 2 (version) + 4 (partition num) + 8 (offset) = 14
+    private static final int HEADER_SIZE = 14;
+
+    private final File metadataStoreFile;
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be 
created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        Path newMetadataSnapshotFilePath = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
+        try (FileChannel fileChannel = 
FileChannel.open(newMetadataSnapshotFilePath,
+                                                        
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+
+            // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition());
+
+            // Write metadata partition offset
+            headerBuffer.putLong(snapshot.metadataPartitionOffset());
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
snapshot.remoteLogSegmentMetadataSnapshots()) {
+                final byte[] serializedBytes = 
serde.serialize(metadataSnapshot);
+                // entry format: <entry-length><entry-bytes>
+
+                // Write entry length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write entry bytes
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+
+            fileChannel.force(true);
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFilePath, 
metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored 
snapshot.
+     */
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {

Review comment:
       No, it is not needed in that case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to