This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f733ac5838 KAFKA-16161: Avoid empty remote metadata snapshot file in 
partition dir (#15636)
2f733ac5838 is described below

commit 2f733ac58386a1669bd021e673fae581172e2b56
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Tue Apr 2 07:37:54 2024 +0530

    KAFKA-16161: Avoid empty remote metadata snapshot file in partition dir 
(#15636)
    
    Avoid empty remote metadata snapshot file in partition dir
    
    Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>, Satish Duggana <sati...@apache.org>
---
 .../storage/FileBasedRemoteLogMetadataCache.java   | 111 ---------
 .../storage/RemoteLogMetadataSnapshotFile.java     | 271 ---------------------
 .../storage/RemoteLogSegmentMetadataSnapshot.java  |   6 +-
 .../RemotePartitionMetadataEventHandler.java       |  10 +-
 .../storage/RemotePartitionMetadataStore.java      |  40 +--
 .../TopicBasedRemoteLogMetadataManager.java        |   3 +-
 .../remote/metadata/storage/ConsumerTaskTest.java  |   4 -
 .../FileBasedRemoteLogMetadataCacheTest.java       |  91 -------
 .../storage/RemoteLogMetadataSnapshotFileTest.java |  86 -------
 9 files changed, 14 insertions(+), 608 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
deleted file mode 100644
index cc992667ce8..00000000000
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.metadata.storage;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is a wrapper around {@link RemoteLogMetadataCache} providing a file 
based snapshot of
- * {@link RemoteLogMetadataCache} for the given {@code topicIdPartition}. 
Snapshot is stored in the given
- * {@code partitionDir}.
- */
-public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache {
-    private static final Logger log = 
LoggerFactory.getLogger(FileBasedRemoteLogMetadataCache.class);
-    private final RemoteLogMetadataSnapshotFile snapshotFile;
-    private final TopicIdPartition topicIdPartition;
-
-    @SuppressWarnings("this-escape")
-    public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition,
-                                           Path partitionDir) {
-        if (!partitionDir.toFile().exists() || 
!partitionDir.toFile().isDirectory()) {
-            throw new KafkaException("Given partition directory:" + 
partitionDir + " must be an existing directory.");
-        }
-
-        this.topicIdPartition = topicIdPartition;
-        snapshotFile = new RemoteLogMetadataSnapshotFile(partitionDir);
-
-        try {
-            snapshotFile.read().ifPresent(snapshot -> 
loadRemoteLogSegmentMetadata(snapshot));
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    protected final void 
loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
-        log.info("Loading snapshot for partition {} is: {}", topicIdPartition, 
snapshot);
-        for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
snapshot.remoteLogSegmentMetadataSnapshots()) {
-            switch (metadataSnapshot.state()) {
-                case COPY_SEGMENT_STARTED:
-                    
addCopyInProgressSegment(createRemoteLogSegmentMetadata(metadataSnapshot));
-                    break;
-                case COPY_SEGMENT_FINISHED:
-                    
handleSegmentWithCopySegmentFinishedState(createRemoteLogSegmentMetadata(metadataSnapshot));
-                    break;
-                case DELETE_SEGMENT_STARTED:
-                    
handleSegmentWithDeleteSegmentStartedState(createRemoteLogSegmentMetadata(metadataSnapshot));
-                    break;
-                case DELETE_SEGMENT_FINISHED:
-                default:
-                    throw new IllegalArgumentException("Given 
remoteLogSegmentMetadata has invalid state: " + metadataSnapshot);
-            }
-        }
-    }
-
-    private RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
-        return new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), 
snapshot.startOffset(),
-                                            snapshot.endOffset(), 
snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
-                                            snapshot.segmentSizeInBytes(), 
snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
-        );
-    }
-
-    /**
-     * Flushes the in-memory state to the snapshot file.
-     *
-     * @param metadataPartition       remote log metadata partition from which 
the messages have been consumed for the given
-     *                                user topic partition.
-     * @param metadataPartitionOffset remote log metadata partition offset up 
to which the messages have been consumed.
-     * @throws IOException if any errors occurred while writing the snapshot 
to the file.
-     */
-    public void flushToFile(int metadataPartition,
-                            Long metadataPartitionOffset) throws IOException {
-        List<RemoteLogSegmentMetadataSnapshot> snapshots = new 
ArrayList<>(idToSegmentMetadata.size());
-        for (RemoteLogLeaderEpochState state : leaderEpochEntries.values()) {
-            // Add unreferenced segments first, as to maintain the order when 
these segments are again read from
-            // the snapshot to build RemoteLogMetadataCache.
-            for (RemoteLogSegmentId id : state.unreferencedSegmentIds()) {
-                
snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
-            }
-            
-            // Add referenced segments.
-            for (RemoteLogSegmentId id : state.referencedSegmentIds()) {
-                
snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
-            }
-        }
-
-        snapshotFile.write(new 
RemoteLogMetadataSnapshotFile.Snapshot(metadataPartition, 
metadataPartitionOffset, snapshots));
-    }
-}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
deleted file mode 100644
index db49bb9cb8a..00000000000
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.metadata.storage;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.Utils;
-import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.FileChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
- * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
- * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
- * broker restarts.
- */
-public class RemoteLogMetadataSnapshotFile {
-    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
-
-    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
-
-    // File format:
-    // <header>[<entry>...]
-    // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long><entries-size:int>
-    // entry: <entry-length><entry-bytes>
-
-    // header size: 2 (version) + 4 (partition num) + 8 (offset) + 4 (entries 
size) = 18
-    private static final int HEADER_SIZE = 18;
-
-    private final File metadataStoreFile;
-    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-
-    /**
-     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
-     * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
-     *
-     * @param metadataStoreDir directory in which the snapshot file to be 
created.
-     */
-    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
-        this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
-
-        // Create an empty file if it does not exist.
-        try {
-            final boolean fileExists = 
Files.exists(metadataStoreFile.toPath());
-            if (!fileExists) {
-                Files.createFile(metadataStoreFile.toPath());
-            }
-            log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, !fileExists);
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    /**
-     * Writes the given snapshot replacing the earlier snapshot data.
-     *
-     * @param snapshot Snapshot to be stored.
-     * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
-     */
-    public synchronized void write(Snapshot snapshot) throws IOException {
-        Path newMetadataSnapshotFilePath = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
-        try (FileChannel fileChannel = 
FileChannel.open(newMetadataSnapshotFilePath,
-                                                        
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
-
-            // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
-            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
-
-            // Write version
-            headerBuffer.putShort(snapshot.version());
-
-            // Write metadata partition and metadata partition offset
-            headerBuffer.putInt(snapshot.metadataPartition());
-
-            // Write metadata partition offset
-            headerBuffer.putLong(snapshot.metadataPartitionOffset());
-
-            // Write entries size
-            Collection<RemoteLogSegmentMetadataSnapshot> metadataSnapshots = 
snapshot.remoteLogSegmentMetadataSnapshots();
-            headerBuffer.putInt(metadataSnapshots.size());
-
-            // Write header
-            headerBuffer.flip();
-            fileChannel.write(headerBuffer);
-
-            // Write each entry
-            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
-            for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
metadataSnapshots) {
-                final byte[] serializedBytes = 
serde.serialize(metadataSnapshot);
-                // entry format: <entry-length><entry-bytes>
-
-                // Write entry length
-                lenBuffer.putInt(serializedBytes.length);
-                lenBuffer.flip();
-                fileChannel.write(lenBuffer);
-                lenBuffer.rewind();
-
-                // Write entry bytes
-                fileChannel.write(ByteBuffer.wrap(serializedBytes));
-            }
-
-            fileChannel.force(true);
-        }
-
-        Utils.atomicMoveWithFallback(newMetadataSnapshotFilePath, 
metadataStoreFile.toPath());
-    }
-
-    /**
-     * @return the Snapshot if it exists.
-     * @throws IOException if there is any error in reading the stored 
snapshot.
-     */
-    public synchronized Optional<Snapshot> read() throws IOException {
-
-        // Checking for empty files.
-        if (metadataStoreFile.length() == 0) {
-            return Optional.empty();
-        }
-
-        try (ReadableByteChannel channel = Channels.newChannel(new 
FileInputStream(metadataStoreFile))) {
-
-            // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
-            // Read header
-            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
-            channel.read(headerBuffer);
-            headerBuffer.rewind();
-            short version = headerBuffer.getShort();
-            int metadataPartition = headerBuffer.getInt();
-            long metadataPartitionOffset = headerBuffer.getLong();
-            int metadataSnapshotsSize = headerBuffer.getInt();
-
-            List<RemoteLogSegmentMetadataSnapshot> result = new 
ArrayList<>(metadataSnapshotsSize);
-            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
-            int lenBufferReadCt;
-            while ((lenBufferReadCt = channel.read(lenBuffer)) > 0) {
-                lenBuffer.rewind();
-
-                if (lenBufferReadCt != lenBuffer.capacity()) {
-                    throw new IOException("Invalid amount of data read for the 
length of an entry, file may have been corrupted.");
-                }
-
-                // entry format: <entry-length><entry-bytes>
-
-                // Read the length of each entry
-                final int len = lenBuffer.getInt();
-                lenBuffer.rewind();
-
-                // Read the entry
-                ByteBuffer data = ByteBuffer.allocate(len);
-                final int read = channel.read(data);
-                if (read != len) {
-                    throw new IOException("Invalid amount of data read, file 
may have been corrupted.");
-                }
-
-                // We are always adding RemoteLogSegmentMetadata only as you 
can see in #write() method.
-                // Did not add a specific serde for RemoteLogSegmentMetadata 
and reusing RemoteLogMetadataSerde
-                final RemoteLogSegmentMetadataSnapshot 
remoteLogSegmentMetadata =
-                        (RemoteLogSegmentMetadataSnapshot) 
serde.deserialize(data.array());
-                result.add(remoteLogSegmentMetadata);
-            }
-
-            if (metadataSnapshotsSize != result.size()) {
-                throw new IOException("Unexpected entries in the snapshot 
file. Expected size: " + metadataSnapshotsSize
-                                              + ", but found: " + 
result.size());
-            }
-
-            return Optional.of(new Snapshot(version, metadataPartition, 
metadataPartitionOffset, result));
-        }
-    }
-
-    /**
-     * This class represents the collection of remote log metadata for a 
specific topic partition.
-     */
-    public static final class Snapshot {
-        private static final short CURRENT_VERSION = 0;
-
-        private final short version;
-        private final int metadataPartition;
-        private final long metadataPartitionOffset;
-        private final Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots;
-
-        public Snapshot(int metadataPartition,
-                        long metadataPartitionOffset,
-                        Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots) {
-            this(CURRENT_VERSION, metadataPartition, metadataPartitionOffset, 
remoteLogSegmentMetadataSnapshots);
-        }
-
-        public Snapshot(short version,
-                        int metadataPartition,
-                        long metadataPartitionOffset,
-                        Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots) {
-            // We will add multiple version support in future if needed. For 
now, the only supported version is CURRENT_VERSION viz 0.
-            if (version != CURRENT_VERSION) {
-                throw new IllegalArgumentException("Unexpected version 
received: " + version);
-            }
-            this.version = version;
-            this.metadataPartition = metadataPartition;
-            this.metadataPartitionOffset = metadataPartitionOffset;
-            this.remoteLogSegmentMetadataSnapshots = 
remoteLogSegmentMetadataSnapshots;
-        }
-
-        public short version() {
-            return version;
-        }
-
-        public int metadataPartition() {
-            return metadataPartition;
-        }
-
-        public long metadataPartitionOffset() {
-            return metadataPartitionOffset;
-        }
-
-        public Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots() {
-            return remoteLogSegmentMetadataSnapshots;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (!(o instanceof Snapshot)) return false;
-            Snapshot snapshot = (Snapshot) o;
-            return version == snapshot.version && metadataPartition == 
snapshot.metadataPartition
-                    && metadataPartitionOffset == 
snapshot.metadataPartitionOffset
-                    && Objects.equals(remoteLogSegmentMetadataSnapshots, 
snapshot.remoteLogSegmentMetadataSnapshots);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(version, metadataPartition, 
metadataPartitionOffset, remoteLogSegmentMetadataSnapshots);
-        }
-
-        @Override
-        public String toString() {
-            return "Snapshot{" +
-                    "version=" + version +
-                    ", metadataPartition=" + metadataPartition +
-                    ", metadataPartitionOffset=" + metadataPartitionOffset +
-                    ", remoteLogSegmentMetadataSnapshotsSize" + 
remoteLogSegmentMetadataSnapshots.size() +
-                    '}';
-        }
-    }
-}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index ec1ed6a66d1..de33a054413 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -33,10 +33,8 @@ import java.util.Optional;
 /**
  * This class represents the entry containing the metadata about a remote log 
segment. This is similar to
  * {@link RemoteLogSegmentMetadata} but it does not contain topic partition 
information. This class keeps
- * only remote log segment ID but not the topic partition.
- *
- * This class is used in storing the snapshot of remote log metadata for a 
specific topic partition as mentioned
- * in {@link RemoteLogMetadataSnapshotFile.Snapshot}.
+ * only remote log segment ID but not the topic partition. This class is used 
in storing the snapshot of
+ * remote log metadata for a specific topic partition.
  */
 public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
index f4f43b0d883..075cab58817 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
@@ -22,8 +22,6 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
 
-import java.io.IOException;
-
 public abstract class RemotePartitionMetadataEventHandler {
 
     public void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
@@ -44,9 +42,11 @@ public abstract class RemotePartitionMetadataEventHandler {
 
     protected abstract void 
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata);
 
-    public abstract void syncLogMetadataSnapshot(TopicIdPartition 
topicIdPartition,
-                                                 int metadataPartition,
-                                                 Long metadataPartitionOffset) 
throws IOException;
+    public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
+                                        int metadataPartition,
+                                        Long metadataPartitionOffset) {
+        // no-op by default
+    }
 
     public abstract void clearTopicPartition(TopicIdPartition 
topicIdPartition);
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index f9394eee99f..14f5b3dd717 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -30,9 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
@@ -46,16 +43,13 @@ import java.util.concurrent.ConcurrentHashMap;
 public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
-    private final Path logDir;
-
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> 
idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, RemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
-    public RemotePartitionMetadataStore(Path logDir) {
-        this.logDir = logDir;
+    public RemotePartitionMetadataStore() {
     }
 
     @Override
@@ -74,10 +68,6 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         }
     }
 
-    private Path partitionLogDirectory(TopicPartition topicPartition) {
-        return new File(logDir.toFile(), topicPartition.topic() + "-" + 
topicPartition.partition()).toPath();
-    }
-
     @Override
     public void 
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) 
{
         log.debug("Updating remote log segment: [{}]", rlsmUpdate);
@@ -110,22 +100,6 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         }
     }
 
-    @Override
-    public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
-                                        int metadataPartition,
-                                        Long metadataPartitionOffset) throws 
IOException {
-        RemotePartitionDeleteMetadata partitionDeleteMetadata = 
idToPartitionDeleteMetadata.get(topicIdPartition);
-        if (partitionDeleteMetadata != null) {
-            log.info("Skipping syncing of metadata snapshot as remote 
partition [{}] is with state: [{}] ", topicIdPartition,
-                     partitionDeleteMetadata);
-        } else {
-            FileBasedRemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
-            if (remoteLogMetadataCache != null) {
-                remoteLogMetadataCache.flushToFile(metadataPartition, 
metadataPartitionOffset);
-            }
-        }
-    }
-
     @Override
     public void clearTopicPartition(TopicIdPartition topicIdPartition) {
         idToRemoteLogMetadataCache.remove(topicIdPartition);
@@ -145,16 +119,15 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         return 
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
     }
 
-    private FileBasedRemoteLogMetadataCache 
getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
+    private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition 
topicIdPartition)
             throws RemoteResourceNotFoundException {
-        FileBasedRemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
         if (remoteLogMetadataCache == null) {
             throw new RemoteResourceNotFoundException("No resource found for 
partition: " + topicIdPartition);
         }
 
         if (!remoteLogMetadataCache.isInitialized()) {
-            // Throwing a retriable ReplicaNotAvailableException here for 
clients retry. We can introduce a new more
-            // appropriate exception with a KIP in the future.
+            // Throwing a retriable ReplicaNotAvailableException here for 
clients retry.
             throw new ReplicaNotAvailableException("Remote log metadata cache 
is not initialized for partition: " + topicIdPartition);
         }
 
@@ -189,8 +162,7 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
 
     @Override
     public void maybeLoadPartition(TopicIdPartition partition) {
-        idToRemoteLogMetadataCache.computeIfAbsent(partition,
-            topicIdPartition -> new 
FileBasedRemoteLogMetadataCache(topicIdPartition, 
partitionLogDirectory(topicIdPartition.topicPartition())));
+        idToRemoteLogMetadataCache.computeIfAbsent(partition, idPartition -> 
new RemoteLogMetadataCache());
     }
 
     @Override
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index a3fc1c6e9b8..b8e3d106664 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -40,7 +40,6 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -359,7 +358,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
             rlmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
-            remotePartitionMetadataStore = new 
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
+            remotePartitionMetadataStore = new RemotePartitionMetadataStore();
             configured = true;
             log.info("Successfully configured topic-based RLMM with config: 
{}", rlmmConfig);
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
index 2b36c4bb039..424c86b6df6 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -390,10 +390,6 @@ public class ConsumerTaskTest {
         protected void 
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata) {
         }
 
-        @Override
-        public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, 
int metadataPartition, Long metadataPartitionOffset) {
-        }
-
         @Override
         public void clearTopicPartition(TopicIdPartition topicIdPartition) {
             isPartitionCleared.put(topicIdPartition, true);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
deleted file mode 100644
index d5341e07b07..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.metadata.storage;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Test;
-
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class FileBasedRemoteLogMetadataCacheTest {
-
-    @Test
-    public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() 
throws Exception {
-        TopicIdPartition partition = new TopicIdPartition(Uuid.randomUuid(), 
new TopicPartition("test", 0));
-        int brokerId = 0;
-        Path path = TestUtils.tempDirectory().toPath();
-
-        // Create file based metadata cache.
-        FileBasedRemoteLogMetadataCache cache = new 
FileBasedRemoteLogMetadataCache(partition, path);
-
-        // Add a segment with start offset as 0 for leader epoch 0.
-        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(partition, 
Uuid.randomUuid());
-        RemoteLogSegmentMetadata metadata1 = new 
RemoteLogSegmentMetadata(segmentId1,
-                                                                          0, 
100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
-                                                                          1024 
* 1024, Collections.singletonMap(0, 0L));
-        cache.addCopyInProgressSegment(metadata1);
-        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new 
RemoteLogSegmentMetadataUpdate(
-                segmentId1, System.currentTimeMillis(),
-                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
-                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
-        cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
-        Optional<RemoteLogSegmentMetadata> receivedMetadata = 
cache.remoteLogSegmentMetadata(0, 0L);
-        assertTrue(receivedMetadata.isPresent());
-        assertEquals(metadata1.createWithUpdates(metadataUpdate1), 
receivedMetadata.get());
-
-        // Add a new segment with start offset as 0 for leader epoch 0, which 
should replace the earlier segment.
-        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(partition, 
Uuid.randomUuid());
-        RemoteLogSegmentMetadata metadata2 = new 
RemoteLogSegmentMetadata(segmentId2,
-                                                                          0, 
900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
-                                                                          1024 
* 1024, Collections.singletonMap(0, 0L));
-        cache.addCopyInProgressSegment(metadata2);
-        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new 
RemoteLogSegmentMetadataUpdate(
-                segmentId2, System.currentTimeMillis(),
-                Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
-                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
-        cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
-
-        // Fetch segment for leader epoch:0 and start offset:0, it should be 
the newly added segment.
-        Optional<RemoteLogSegmentMetadata> receivedMetadata2 = 
cache.remoteLogSegmentMetadata(0, 0L);
-        assertTrue(receivedMetadata2.isPresent());
-        assertEquals(metadata2.createWithUpdates(metadataUpdate2), 
receivedMetadata2.get());
-        // Flush the cache to the file.
-        cache.flushToFile(0, 0L);
-
-        // Create a new cache with loading from the stored path.
-        FileBasedRemoteLogMetadataCache loadedCache = new 
FileBasedRemoteLogMetadataCache(partition, path);
-
-        // Fetch segment for leader epoch:0 and start offset:0, it should be 
metadata2.
-        // This ensures that the ordering of metadata is taken care after 
loading from the stored snapshots.
-        Optional<RemoteLogSegmentMetadata> receivedMetadataAfterLoad = 
loadedCache.remoteLogSegmentMetadata(0, 0L);
-        assertTrue(receivedMetadataAfterLoad.isPresent());
-        assertEquals(metadata2.createWithUpdates(metadataUpdate2), 
receivedMetadataAfterLoad.get());
-    }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
deleted file mode 100644
index dbfbbf3b044..00000000000
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.metadata.storage;
-
-import org.apache.kafka.common.Uuid;
-import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-
-public class RemoteLogMetadataSnapshotFileTest {
-
-    @Test
-    public void testEmptyCommittedLogMetadataFile() throws Exception {
-        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
-        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
-
-        // There should be an empty snapshot as nothing is written into it.
-        Assertions.assertFalse(snapshotFile.read().isPresent());
-    }
-
-    @Test
-    public void testEmptySnapshotWithCommittedLogMetadataFile() throws 
Exception {
-        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
-        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
-
-        snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(0, 0L, 
Collections.emptyList()));
-
-        // There should be an empty snapshot as the written snapshot did not 
have any remote log segment metadata.
-        Assertions.assertTrue(snapshotFile.read().isPresent());
-        
Assertions.assertTrue(snapshotFile.read().get().remoteLogSegmentMetadataSnapshots().isEmpty());
-    }
-
-    @Test
-    public void testWriteReadCommittedLogMetadataFile() throws Exception {
-        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
-        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
-
-        List<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadatas = new 
ArrayList<>();
-        long startOffset = 0;
-        for (int i = 0; i < 100; i++) {
-            long endOffset = startOffset + 100L;
-            CustomMetadata customMetadata = new CustomMetadata(new 
byte[]{(byte) i});
-            remoteLogSegmentMetadatas.add(
-                    new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), 
startOffset, endOffset,
-                                                         
System.currentTimeMillis(), 1, 100, 1024,
-                                                         
Optional.of(customMetadata),
-                                                         
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, 
startOffset)
-                    ));
-            startOffset = endOffset + 1;
-        }
-
-        RemoteLogMetadataSnapshotFile.Snapshot snapshot = new 
RemoteLogMetadataSnapshotFile.Snapshot(0, 120,
-                                                                               
                      remoteLogSegmentMetadatas);
-        snapshotFile.write(snapshot);
-
-        Optional<RemoteLogMetadataSnapshotFile.Snapshot> maybeReadSnapshot = 
snapshotFile.read();
-        Assertions.assertTrue(maybeReadSnapshot.isPresent());
-
-        Assertions.assertEquals(snapshot, maybeReadSnapshot.get());
-        Assertions.assertEquals(new 
HashSet<>(snapshot.remoteLogSegmentMetadataSnapshots()),
-                                new 
HashSet<>(maybeReadSnapshot.get().remoteLogSegmentMetadataSnapshots()));
-    }
-}


Reply via email to