This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2f733ac5838 KAFKA-16161: Avoid empty remote metadata snapshot file in partition dir (#15636) 2f733ac5838 is described below commit 2f733ac58386a1669bd021e673fae581172e2b56 Author: Kamal Chandraprakash <kchandraprak...@uber.com> AuthorDate: Tue Apr 2 07:37:54 2024 +0530 KAFKA-16161: Avoid empty remote metadata snapshot file in partition dir (#15636) Avoid empty remote metadata snapshot file in partition dir Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>, Satish Duggana <sati...@apache.org> --- .../storage/FileBasedRemoteLogMetadataCache.java | 111 --------- .../storage/RemoteLogMetadataSnapshotFile.java | 271 --------------------- .../storage/RemoteLogSegmentMetadataSnapshot.java | 6 +- .../RemotePartitionMetadataEventHandler.java | 10 +- .../storage/RemotePartitionMetadataStore.java | 40 +-- .../TopicBasedRemoteLogMetadataManager.java | 3 +- .../remote/metadata/storage/ConsumerTaskTest.java | 4 - .../FileBasedRemoteLogMetadataCacheTest.java | 91 ------- .../storage/RemoteLogMetadataSnapshotFileTest.java | 86 ------- 9 files changed, 14 insertions(+), 608 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java deleted file mode 100644 index cc992667ce8..00000000000 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -/** - * This is a wrapper around {@link RemoteLogMetadataCache} providing a file based snapshot of - * {@link RemoteLogMetadataCache} for the given {@code topicIdPartition}. Snapshot is stored in the given - * {@code partitionDir}. - */ -public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache { - private static final Logger log = LoggerFactory.getLogger(FileBasedRemoteLogMetadataCache.class); - private final RemoteLogMetadataSnapshotFile snapshotFile; - private final TopicIdPartition topicIdPartition; - - @SuppressWarnings("this-escape") - public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition, - Path partitionDir) { - if (!partitionDir.toFile().exists() || !partitionDir.toFile().isDirectory()) { - throw new KafkaException("Given partition directory:" + partitionDir + " must be an existing directory."); - } - - this.topicIdPartition = topicIdPartition; - snapshotFile = new RemoteLogMetadataSnapshotFile(partitionDir); - - try { - snapshotFile.read().ifPresent(snapshot -> loadRemoteLogSegmentMetadata(snapshot)); - } catch (IOException e) { - throw new KafkaException(e); - } - } - - protected final void loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) { - log.info("Loading snapshot for partition {} is: {}", topicIdPartition, snapshot); - for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : snapshot.remoteLogSegmentMetadataSnapshots()) { - switch (metadataSnapshot.state()) { - case COPY_SEGMENT_STARTED: - addCopyInProgressSegment(createRemoteLogSegmentMetadata(metadataSnapshot)); - break; - case COPY_SEGMENT_FINISHED: - handleSegmentWithCopySegmentFinishedState(createRemoteLogSegmentMetadata(metadataSnapshot)); - break; - case DELETE_SEGMENT_STARTED: - handleSegmentWithDeleteSegmentStartedState(createRemoteLogSegmentMetadata(metadataSnapshot)); - break; - case DELETE_SEGMENT_FINISHED: - default: - throw new IllegalArgumentException("Given remoteLogSegmentMetadata has invalid state: " + metadataSnapshot); - } - } - } - - private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) { - return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), snapshot.startOffset(), - snapshot.endOffset(), snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(), - snapshot.segmentSizeInBytes(), snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs() - ); - } - - /** - * Flushes the in-memory state to the snapshot file. - * - * @param metadataPartition remote log metadata partition from which the messages have been consumed for the given - * user topic partition. - * @param metadataPartitionOffset remote log metadata partition offset up to which the messages have been consumed. - * @throws IOException if any errors occurred while writing the snapshot to the file. - */ - public void flushToFile(int metadataPartition, - Long metadataPartitionOffset) throws IOException { - List<RemoteLogSegmentMetadataSnapshot> snapshots = new ArrayList<>(idToSegmentMetadata.size()); - for (RemoteLogLeaderEpochState state : leaderEpochEntries.values()) { - // Add unreferenced segments first, as to maintain the order when these segments are again read from - // the snapshot to build RemoteLogMetadataCache. - for (RemoteLogSegmentId id : state.unreferencedSegmentIds()) { - snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id))); - } - - // Add referenced segments. - for (RemoteLogSegmentId id : state.referencedSegmentIds()) { - snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id))); - } - } - - snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(metadataPartition, metadataPartitionOffset, snapshots)); - } -} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java deleted file mode 100644 index db49bb9cb8a..00000000000 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.FileChannel; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** - * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by - * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from - * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a - * broker restarts. - */ -public class RemoteLogMetadataSnapshotFile { - private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class); - - public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot"; - - // File format: - // <header>[<entry>...] - // header: <version:short><metadata-partition:int><metadata-partition-offset:long><entries-size:int> - // entry: <entry-length><entry-bytes> - - // header size: 2 (version) + 4 (partition num) + 8 (offset) + 4 (entries size) = 18 - private static final int HEADER_SIZE = 18; - - private final File metadataStoreFile; - private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - - /** - * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in - * the given {@code metadataStoreDir}. It creates the file if it does not exist. - * - * @param metadataStoreDir directory in which the snapshot file to be created. - */ - RemoteLogMetadataSnapshotFile(Path metadataStoreDir) { - this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME); - - // Create an empty file if it does not exist. - try { - final boolean fileExists = Files.exists(metadataStoreFile.toPath()); - if (!fileExists) { - Files.createFile(metadataStoreFile.toPath()); - } - log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, !fileExists); - } catch (IOException e) { - throw new KafkaException(e); - } - } - - /** - * Writes the given snapshot replacing the earlier snapshot data. - * - * @param snapshot Snapshot to be stored. - * @throws IOException if there4 is any error in writing the given snapshot to the file. - */ - public synchronized void write(Snapshot snapshot) throws IOException { - Path newMetadataSnapshotFilePath = new File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath(); - try (FileChannel fileChannel = FileChannel.open(newMetadataSnapshotFilePath, - StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - - // header: <version:short><metadata-partition:int><metadata-partition-offset:long> - ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); - - // Write version - headerBuffer.putShort(snapshot.version()); - - // Write metadata partition and metadata partition offset - headerBuffer.putInt(snapshot.metadataPartition()); - - // Write metadata partition offset - headerBuffer.putLong(snapshot.metadataPartitionOffset()); - - // Write entries size - Collection<RemoteLogSegmentMetadataSnapshot> metadataSnapshots = snapshot.remoteLogSegmentMetadataSnapshots(); - headerBuffer.putInt(metadataSnapshots.size()); - - // Write header - headerBuffer.flip(); - fileChannel.write(headerBuffer); - - // Write each entry - ByteBuffer lenBuffer = ByteBuffer.allocate(4); - for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : metadataSnapshots) { - final byte[] serializedBytes = serde.serialize(metadataSnapshot); - // entry format: <entry-length><entry-bytes> - - // Write entry length - lenBuffer.putInt(serializedBytes.length); - lenBuffer.flip(); - fileChannel.write(lenBuffer); - lenBuffer.rewind(); - - // Write entry bytes - fileChannel.write(ByteBuffer.wrap(serializedBytes)); - } - - fileChannel.force(true); - } - - Utils.atomicMoveWithFallback(newMetadataSnapshotFilePath, metadataStoreFile.toPath()); - } - - /** - * @return the Snapshot if it exists. - * @throws IOException if there is any error in reading the stored snapshot. - */ - public synchronized Optional<Snapshot> read() throws IOException { - - // Checking for empty files. - if (metadataStoreFile.length() == 0) { - return Optional.empty(); - } - - try (ReadableByteChannel channel = Channels.newChannel(new FileInputStream(metadataStoreFile))) { - - // header: <version:short><metadata-partition:int><metadata-partition-offset:long> - // Read header - ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); - channel.read(headerBuffer); - headerBuffer.rewind(); - short version = headerBuffer.getShort(); - int metadataPartition = headerBuffer.getInt(); - long metadataPartitionOffset = headerBuffer.getLong(); - int metadataSnapshotsSize = headerBuffer.getInt(); - - List<RemoteLogSegmentMetadataSnapshot> result = new ArrayList<>(metadataSnapshotsSize); - ByteBuffer lenBuffer = ByteBuffer.allocate(4); - int lenBufferReadCt; - while ((lenBufferReadCt = channel.read(lenBuffer)) > 0) { - lenBuffer.rewind(); - - if (lenBufferReadCt != lenBuffer.capacity()) { - throw new IOException("Invalid amount of data read for the length of an entry, file may have been corrupted."); - } - - // entry format: <entry-length><entry-bytes> - - // Read the length of each entry - final int len = lenBuffer.getInt(); - lenBuffer.rewind(); - - // Read the entry - ByteBuffer data = ByteBuffer.allocate(len); - final int read = channel.read(data); - if (read != len) { - throw new IOException("Invalid amount of data read, file may have been corrupted."); - } - - // We are always adding RemoteLogSegmentMetadata only as you can see in #write() method. - // Did not add a specific serde for RemoteLogSegmentMetadata and reusing RemoteLogMetadataSerde - final RemoteLogSegmentMetadataSnapshot remoteLogSegmentMetadata = - (RemoteLogSegmentMetadataSnapshot) serde.deserialize(data.array()); - result.add(remoteLogSegmentMetadata); - } - - if (metadataSnapshotsSize != result.size()) { - throw new IOException("Unexpected entries in the snapshot file. Expected size: " + metadataSnapshotsSize - + ", but found: " + result.size()); - } - - return Optional.of(new Snapshot(version, metadataPartition, metadataPartitionOffset, result)); - } - } - - /** - * This class represents the collection of remote log metadata for a specific topic partition. - */ - public static final class Snapshot { - private static final short CURRENT_VERSION = 0; - - private final short version; - private final int metadataPartition; - private final long metadataPartitionOffset; - private final Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots; - - public Snapshot(int metadataPartition, - long metadataPartitionOffset, - Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots) { - this(CURRENT_VERSION, metadataPartition, metadataPartitionOffset, remoteLogSegmentMetadataSnapshots); - } - - public Snapshot(short version, - int metadataPartition, - long metadataPartitionOffset, - Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots) { - // We will add multiple version support in future if needed. For now, the only supported version is CURRENT_VERSION viz 0. - if (version != CURRENT_VERSION) { - throw new IllegalArgumentException("Unexpected version received: " + version); - } - this.version = version; - this.metadataPartition = metadataPartition; - this.metadataPartitionOffset = metadataPartitionOffset; - this.remoteLogSegmentMetadataSnapshots = remoteLogSegmentMetadataSnapshots; - } - - public short version() { - return version; - } - - public int metadataPartition() { - return metadataPartition; - } - - public long metadataPartitionOffset() { - return metadataPartitionOffset; - } - - public Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots() { - return remoteLogSegmentMetadataSnapshots; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Snapshot)) return false; - Snapshot snapshot = (Snapshot) o; - return version == snapshot.version && metadataPartition == snapshot.metadataPartition - && metadataPartitionOffset == snapshot.metadataPartitionOffset - && Objects.equals(remoteLogSegmentMetadataSnapshots, snapshot.remoteLogSegmentMetadataSnapshots); - } - - @Override - public int hashCode() { - return Objects.hash(version, metadataPartition, metadataPartitionOffset, remoteLogSegmentMetadataSnapshots); - } - - @Override - public String toString() { - return "Snapshot{" + - "version=" + version + - ", metadataPartition=" + metadataPartition + - ", metadataPartitionOffset=" + metadataPartitionOffset + - ", remoteLogSegmentMetadataSnapshotsSize" + remoteLogSegmentMetadataSnapshots.size() + - '}'; - } - } -} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java index ec1ed6a66d1..de33a054413 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java @@ -33,10 +33,8 @@ import java.util.Optional; /** * This class represents the entry containing the metadata about a remote log segment. This is similar to * {@link RemoteLogSegmentMetadata} but it does not contain topic partition information. This class keeps - * only remote log segment ID but not the topic partition. - * - * This class is used in storing the snapshot of remote log metadata for a specific topic partition as mentioned - * in {@link RemoteLogMetadataSnapshotFile.Snapshot}. + * only remote log segment ID but not the topic partition. This class is used in storing the snapshot of + * remote log metadata for a specific topic partition. */ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java index f4f43b0d883..075cab58817 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java @@ -22,8 +22,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import java.io.IOException; - public abstract class RemotePartitionMetadataEventHandler { public void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) { @@ -44,9 +42,11 @@ public abstract class RemotePartitionMetadataEventHandler { protected abstract void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata); - public abstract void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, - int metadataPartition, - Long metadataPartitionOffset) throws IOException; + public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, + int metadataPartition, + Long metadataPartitionOffset) { + // no-op by default + } public abstract void clearTopicPartition(TopicIdPartition topicIdPartition); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index f9394eee99f..14f5b3dd717 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -30,9 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -46,16 +43,13 @@ import java.util.concurrent.ConcurrentHashMap; public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable { private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class); - private final Path logDir; - private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata = new ConcurrentHashMap<>(); - private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> idToRemoteLogMetadataCache = + private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache = new ConcurrentHashMap<>(); - public RemotePartitionMetadataStore(Path logDir) { - this.logDir = logDir; + public RemotePartitionMetadataStore() { } @Override @@ -74,10 +68,6 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa } } - private Path partitionLogDirectory(TopicPartition topicPartition) { - return new File(logDir.toFile(), topicPartition.topic() + "-" + topicPartition.partition()).toPath(); - } - @Override public void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) { log.debug("Updating remote log segment: [{}]", rlsmUpdate); @@ -110,22 +100,6 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa } } - @Override - public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, - int metadataPartition, - Long metadataPartitionOffset) throws IOException { - RemotePartitionDeleteMetadata partitionDeleteMetadata = idToPartitionDeleteMetadata.get(topicIdPartition); - if (partitionDeleteMetadata != null) { - log.info("Skipping syncing of metadata snapshot as remote partition [{}] is with state: [{}] ", topicIdPartition, - partitionDeleteMetadata); - } else { - FileBasedRemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); - if (remoteLogMetadataCache != null) { - remoteLogMetadataCache.flushToFile(metadataPartition, metadataPartitionOffset); - } - } - } - @Override public void clearTopicPartition(TopicIdPartition topicIdPartition) { idToRemoteLogMetadataCache.remove(topicIdPartition); @@ -145,16 +119,15 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch); } - private FileBasedRemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) + private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) throws RemoteResourceNotFoundException { - FileBasedRemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); + RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); if (remoteLogMetadataCache == null) { throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } if (!remoteLogMetadataCache.isInitialized()) { - // Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more - // appropriate exception with a KIP in the future. + // Throwing a retriable ReplicaNotAvailableException here for clients retry. throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); } @@ -189,8 +162,7 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @Override public void maybeLoadPartition(TopicIdPartition partition) { - idToRemoteLogMetadataCache.computeIfAbsent(partition, - topicIdPartition -> new FileBasedRemoteLogMetadataCache(topicIdPartition, partitionLogDirectory(topicIdPartition.topicPartition()))); + idToRemoteLogMetadataCache.computeIfAbsent(partition, idPartition -> new RemoteLogMetadataCache()); } @Override diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index a3fc1c6e9b8..b8e3d106664 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -40,7 +40,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -359,7 +358,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); - remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath()); + remotePartitionMetadataStore = new RemotePartitionMetadataStore(); configured = true; log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index 2b36c4bb039..424c86b6df6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -390,10 +390,6 @@ public class ConsumerTaskTest { protected void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) { } - @Override - public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, int metadataPartition, Long metadataPartitionOffset) { - } - @Override public void clearTopicPartition(TopicIdPartition topicIdPartition) { isPartitionCleared.put(topicIdPartition, true); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java deleted file mode 100644 index d5341e07b07..00000000000 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; -import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Test; - -import java.nio.file.Path; -import java.util.Collections; -import java.util.Optional; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class FileBasedRemoteLogMetadataCacheTest { - - @Test - public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() throws Exception { - TopicIdPartition partition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test", 0)); - int brokerId = 0; - Path path = TestUtils.tempDirectory().toPath(); - - // Create file based metadata cache. - FileBasedRemoteLogMetadataCache cache = new FileBasedRemoteLogMetadataCache(partition, path); - - // Add a segment with start offset as 0 for leader epoch 0. - RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(partition, Uuid.randomUuid()); - RemoteLogSegmentMetadata metadata1 = new RemoteLogSegmentMetadata(segmentId1, - 0, 100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(), - 1024 * 1024, Collections.singletonMap(0, 0L)); - cache.addCopyInProgressSegment(metadata1); - RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate( - segmentId1, System.currentTimeMillis(), - Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); - cache.updateRemoteLogSegmentMetadata(metadataUpdate1); - Optional<RemoteLogSegmentMetadata> receivedMetadata = cache.remoteLogSegmentMetadata(0, 0L); - assertTrue(receivedMetadata.isPresent()); - assertEquals(metadata1.createWithUpdates(metadataUpdate1), receivedMetadata.get()); - - // Add a new segment with start offset as 0 for leader epoch 0, which should replace the earlier segment. - RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(partition, Uuid.randomUuid()); - RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(segmentId2, - 0, 900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(), - 1024 * 1024, Collections.singletonMap(0, 0L)); - cache.addCopyInProgressSegment(metadata2); - RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate( - segmentId2, System.currentTimeMillis(), - Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); - cache.updateRemoteLogSegmentMetadata(metadataUpdate2); - - // Fetch segment for leader epoch:0 and start offset:0, it should be the newly added segment. - Optional<RemoteLogSegmentMetadata> receivedMetadata2 = cache.remoteLogSegmentMetadata(0, 0L); - assertTrue(receivedMetadata2.isPresent()); - assertEquals(metadata2.createWithUpdates(metadataUpdate2), receivedMetadata2.get()); - // Flush the cache to the file. - cache.flushToFile(0, 0L); - - // Create a new cache with loading from the stored path. - FileBasedRemoteLogMetadataCache loadedCache = new FileBasedRemoteLogMetadataCache(partition, path); - - // Fetch segment for leader epoch:0 and start offset:0, it should be metadata2. - // This ensures that the ordering of metadata is taken care after loading from the stored snapshots. - Optional<RemoteLogSegmentMetadata> receivedMetadataAfterLoad = loadedCache.remoteLogSegmentMetadata(0, 0L); - assertTrue(receivedMetadataAfterLoad.isPresent()); - assertEquals(metadata2.createWithUpdates(metadataUpdate2), receivedMetadataAfterLoad.get()); - } -} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java deleted file mode 100644 index dbfbbf3b044..00000000000 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.log.remote.metadata.storage; - -import org.apache.kafka.common.Uuid; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; -import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; - -public class RemoteLogMetadataSnapshotFileTest { - - @Test - public void testEmptyCommittedLogMetadataFile() throws Exception { - File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed"); - RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath()); - - // There should be an empty snapshot as nothing is written into it. - Assertions.assertFalse(snapshotFile.read().isPresent()); - } - - @Test - public void testEmptySnapshotWithCommittedLogMetadataFile() throws Exception { - File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed"); - RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath()); - - snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(0, 0L, Collections.emptyList())); - - // There should be an empty snapshot as the written snapshot did not have any remote log segment metadata. - Assertions.assertTrue(snapshotFile.read().isPresent()); - Assertions.assertTrue(snapshotFile.read().get().remoteLogSegmentMetadataSnapshots().isEmpty()); - } - - @Test - public void testWriteReadCommittedLogMetadataFile() throws Exception { - File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed"); - RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath()); - - List<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadatas = new ArrayList<>(); - long startOffset = 0; - for (int i = 0; i < 100; i++) { - long endOffset = startOffset + 100L; - CustomMetadata customMetadata = new CustomMetadata(new byte[]{(byte) i}); - remoteLogSegmentMetadatas.add( - new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), startOffset, endOffset, - System.currentTimeMillis(), 1, 100, 1024, - Optional.of(customMetadata), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset) - )); - startOffset = endOffset + 1; - } - - RemoteLogMetadataSnapshotFile.Snapshot snapshot = new RemoteLogMetadataSnapshotFile.Snapshot(0, 120, - remoteLogSegmentMetadatas); - snapshotFile.write(snapshot); - - Optional<RemoteLogMetadataSnapshotFile.Snapshot> maybeReadSnapshot = snapshotFile.read(); - Assertions.assertTrue(maybeReadSnapshot.isPresent()); - - Assertions.assertEquals(snapshot, maybeReadSnapshot.get()); - Assertions.assertEquals(new HashSet<>(snapshot.remoteLogSegmentMetadataSnapshots()), - new HashSet<>(maybeReadSnapshot.get().remoteLogSegmentMetadataSnapshots())); - } -}