junrao commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r717821546



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +171,37 @@ public void run() {
         }
     }
 
+    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = 
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, 
forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // partitionToConsumedOffsets is not getting changed concurrently 
as this method is called from #run() which updates the same.
+            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might get updated by other threads.
+            synchronized (assignPartitionsLock) {
+                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
+                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
+                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
+                    if (offset != null && 
!offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) {

Review comment:
       This means that if there is no change to a remoteLogMetadataCache, but 
there is new record for other partitions in the same metadataPartition, we 
still need to flush remoteLogMetadataCache.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -85,32 +90,78 @@
     // Map of remote log metadata topic partition to consumed offsets.
     private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+    private Map<Integer, Long> committedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+    private final long committedOffsetSyncIntervalMs;
+    private CommittedOffsetsFile committedOffsetsFile;
+    private long lastSyncedTimeMs;
+
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
                         RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(remotePartitionMetadataEventHandler);
-        Objects.requireNonNull(topicPartitioner);
-
-        this.consumer = consumer;
-        this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-        this.topicPartitioner = topicPartitioner;
+                        RemoteLogMetadataTopicPartitioner topicPartitioner,
+                        Path committedOffsetsPath,
+                        Time time,
+                        long committedOffsetSyncIntervalMs) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+        this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.time = Objects.requireNonNull(time);
+        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+        initializeConsumerAssignment(committedOffsetsPath);
+    }
+
+    private void initializeConsumerAssignment(Path committedOffsetsPath) {
+        try {
+            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        Map<Integer, Long> committedOffsets = Collections.emptyMap();
+        try {
+            // Load committed offset and assign them in the consumer.
+            committedOffsets = committedOffsetsFile.readEntries();
+        } catch (IOException e) {
+            // Ignore the error and consumer consumes from the earliest offset.
+            log.error("Encountered error while building committed offsets from 
the file", e);
+        }
+
+        if (!committedOffsets.isEmpty()) {
+            // Assign topic partitions from the earlier committed offsets file.
+            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
+            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+                                                                               
    .collect(Collectors.toSet());
+            consumer.assign(metadataTopicPartitions);
+
+            // Seek to the committed offsets
+            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
+                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());

Review comment:
       If the metadataTopicPartitions changes, should we remove unneeded 
partitions from partitionToConsumedOffsets?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
##########
@@ -42,21 +45,36 @@
 public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
+    private final Path logDir;
+
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> 
idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, RemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
+    public RemotePartitionMetadataStore(Path logDir) {
+        this.logDir = logDir;
+    }
+
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
         log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
 
-        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        final RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        TopicIdPartition topicIdPartition = 
remoteLogSegmentId.topicIdPartition();
+
+        // This should have been already existing as it is loaded when the 
partitions are assigned.
+        RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        if (remoteLogMetadataCache != null) {
+            
remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata);
+        } else {
+            log.warn("No partition metadata found for : " + topicIdPartition);

Review comment:
       If this is unexpected, should we throw an IllegalStateException?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##########
@@ -39,6 +41,7 @@
     private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
     private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
     private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();

Review comment:
       Hmm, since the remote log snapshot file has a file level header, having 
the frame header at record level seems redundant. But for simplicity, we 
probably could just write the framed record to the snapshot file. Could we 
update the comment accordingly?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -240,6 +323,11 @@ public void close() {
                 // if the closing is already set.
                 closing = true;
                 consumer.wakeup();
+                try {
+                    maybeSyncCommittedDataAndOffsets(true);

Review comment:
       Should we wait until the consumer task completes before writing the 
checkpoint file? Otherwise, we could be reading the in-memory state while it's 
being updated.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // header: <version:short><topicId:2 
longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30

Review comment:
       Could we describe the format of the rest of the content too?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
+
+    /**
+     * Universally unique remote log segment id.

Review comment:
       I am a bit confused. This class seems to be the same as 
RemoteLogSegmentMetadata?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {

Review comment:
       Could we add a comment for this class?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // header: <version:short><topicId:2 
longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be 
created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp");
+        try (WritableByteChannel fileChannel = Channels.newChannel(new 
FileOutputStream(newMetadataSnapshotFile))) {
+
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version());
+
+            // Write topic-id
+            headerBuffer.putLong(snapshot.topicId().getMostSignificantBits());
+            headerBuffer.putLong(snapshot.topicId().getLeastSignificantBits());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition());
+            headerBuffer.putLong(snapshot.metadataPartitionOffset());
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
snapshot.remoteLogSegmentMetadataSnapshots()) {
+                final byte[] serializedBytes = 
serde.serialize(metadataSnapshot);
+                // Write length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write data
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFile.toPath(), 
metadataStoreFile.toPath());

Review comment:
       Should we force the channel at the end?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -85,32 +90,78 @@
     // Map of remote log metadata topic partition to consumed offsets.
     private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+    private Map<Integer, Long> committedPartitionToConsumedOffsets = 
Collections.emptyMap();

Review comment:
       To be consistent with the naming of partitionToConsumedOffsets, would it 
be better to name this partitionToCommittedOffsets?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##########
@@ -161,77 +161,73 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada
                 throw new IllegalArgumentException("metadataUpdate: " + 
metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
                                                    " can not be updated");
             case COPY_SEGMENT_FINISHED:
-                handleSegmentWithCopySegmentFinishedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_STARTED:
-                handleSegmentWithDeleteSegmentStartedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_FINISHED:
-                handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             default:
                 throw new IllegalArgumentException("Metadata with the state " 
+ targetState + " is not supported");
         }
     }
 
-    private void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-                                                           
RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Adding remote log segment metadata to leader epoch mappings 
with update: [{}]", metadataUpdate);
-
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                
RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState);
+    protected final void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, 
remoteLogLeaderEpochState, startOffset, segmentId) -> {
+                                                          long 
leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch,
+                                                                               
                             remoteLogSegmentMetadata);
+                                                          
remoteLogLeaderEpochState.handleSegmentWithCopySegmentFinishedState(startOffset,
+                                                                               
                                               segmentId,
+                                                                               
                                               leaderEpochEndOffset);
+                                                      });
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
     }
 
-    private void 
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-                                                            
RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+    protected final void 
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Cleaning up the state for : [{}]", 
remoteLogSegmentMetadata);
 
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                
RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentStartedState);
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, 
remoteLogLeaderEpochState, startOffset, segmentId) ->
+                                                              
remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset,
 segmentId));
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);

Review comment:
       This is an existing issue. When removing old segments, should we remove 
entries from leaderEpochEntries too? 

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // header: <version:short><topicId:2 
longs><metadata-partition:int><metadata-partition-offset:long>

Review comment:
       The thing is that we already store topicId in a partitionMetadata file 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-PartitionMetadatafile).
 Storing the same info in another file seems to add confusion.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // header: <version:short><topicId:2 
longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be 
created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new 
File(metadataStoreFile.getAbsolutePath() + ".new");
+        try (WritableByteChannel fileChannel = Channels.newChannel(new 
FileOutputStream(newMetadataSnapshotFile))) {
+
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version);
+
+            // Write topic-id
+            headerBuffer.putLong(snapshot.topicId.getMostSignificantBits());
+            headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition);
+            headerBuffer.putLong(snapshot.metadataPartitionOffset);
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+            for (RemoteLogSegmentMetadata remoteLogSegmentMetadata : 
snapshot.remoteLogMetadatas) {
+                final byte[] serializedBytes = 
serde.serialize(remoteLogSegmentMetadata);
+                // Write length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write data
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFile.toPath(), 
metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored 
snapshot.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new 
FileInputStream(metadataStoreFile))) {
+
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            Uuid topicId = new Uuid(headerBuffer.getLong(), 
headerBuffer.getLong());
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+            List<RemoteLogSegmentMetadata> result = new ArrayList<>();
+
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            while (channel.read(lenBuffer) > 0) {
+                lenBuffer.rewind();
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {
+                    throw new IOException("Invalid amount of data read, file 
may have been corrupted.");
+                }
+
+                // We are always adding RemoteLogSegmentMetadata only as you 
can see in #write() method.
+                // Did not add a specific serde for RemoteLogSegmentMetadata 
and reusing RemoteLogMetadataSerde
+                final RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
(RemoteLogSegmentMetadata) serde.deserialize(data.array());
+                result.add(remoteLogSegmentMetadata);
+            }
+
+            return Optional.of(new Snapshot(version, topicId, 
metadataPartition, metadataPartitionOffset, result));
+        }
+    }
+
+    /**
+     * This class represents the collection of remote log metadata for a 
specific topic partition.
+     */
+    public static final class Snapshot {
+        private static final short CURRENT_VERSION = 0;
+
+        private final short version;
+        private final Uuid topicId;
+        private final int metadataPartition;
+        private final long metadataPartitionOffset;
+        private final Collection<RemoteLogSegmentMetadata> remoteLogMetadatas;
+
+        public Snapshot(Uuid topicId,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadata> 
remoteLogMetadatas) {
+            this(CURRENT_VERSION, topicId, metadataPartition, 
metadataPartitionOffset, remoteLogMetadatas);
+        }
+
+        public Snapshot(short version,
+                        Uuid topicId,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadata> 
remoteLogMetadatas) {
+            this.version = version;
+            this.topicId = topicId;
+            this.metadataPartition = metadataPartition;
+            this.metadataPartitionOffset = metadataPartitionOffset;
+            this.remoteLogMetadatas = remoteLogMetadatas;
+        }
+
+        public short version() {
+            return version;
+        }
+
+        public Uuid topicId() {
+            return topicId;
+        }
+
+        public int metadataPartition() {
+            return metadataPartition;
+        }
+
+        public long metadataPartitionOffset() {

Review comment:
       Since we flush the consumer offset file after flushing the snapshot 
file, it's possible for messages to be replayed on broker restart. Should we 
use metadataPartitionOffset to avoid duplicated messages being reapplied to the 
remote log segment metadata cache?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to