jeqo commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1258867599


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu</a>. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
+    public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+    public static final String DIR_NAME = "remote-log-index-cache";
+
+    /**
+     * Directory where the index files will be stored on disk.
+     */
+    private final File cacheDir;
+
+    /**
+     * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+     */
+    private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+    /**
+     * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+     */
+    private final LinkedBlockingQueue<Entry> expiredIndexes = new 
LinkedBlockingQueue<>();
+
+    /**
+     * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+     * concurrent reads in-progress.
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Actual cache implementation that this file wraps around.
+     *
+     * The requirements for this internal cache is as follows:
+     * 1. Multiple threads should be able to read concurrently.
+     * 2. Fetch for missing keys should not block read for available keys.
+     * 3. Only one thread should fetch for a specific key.
+     * 4. Should support LRU-like policy.
+     *
+     * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
+     */
+    private final Cache<Uuid, Entry> internalCache;
+    private final RemoteStorageManager remoteStorageManager;
+    private final ShutdownableThread cleanerThread;
+
+    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+        this(1024, remoteStorageManager, logDir);
+    }
+
+    /**
+     * Creates RemoteIndexCache with the given configs.
+     *
+     * @param maxSize              maximum number of segment index entries to 
be cached.
+     * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+     * @param logDir               log directory
+     */
+    public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+        this.remoteStorageManager = remoteStorageManager;
+        cacheDir = new File(logDir, DIR_NAME);
+
+        internalCache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                // removeListener is invoked when either the entry is 
invalidated (means manual removal by the caller) or
+                // evicted (means removal due to the policy)
+                .removalListener((Uuid key, Entry entry, RemovalCause cause) 
-> {
+                    // Mark the entries for cleanup and add them to the queue 
to be garbage collected later by the background thread.
+                    if (entry != null) {
+                        try {
+                            entry.markForCleanup();
+                        } catch (IOException e) {
+                            throw new KafkaException(e);
+                        }
+                        if (!expiredIndexes.offer(entry)) {
+                            log.error("Error while inserting entry {} for key 
{} into the cleaner queue because queue is full.", entry, key);
+                        }
+                    } else {
+                        log.error("Received entry as null for key {} when the 
it is removed from the cache.", key);
+                    }
+                }).build();
+
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public Collection<Entry> expiredIndexes() {
+        return Collections.unmodifiableCollection(expiredIndexes);
+    }
+
+    // Visible for testing
+    public Cache<Uuid, Entry> internalCache() {
+        return internalCache;
+    }
+
+    // Visible for testing
+    public ShutdownableThread cleanerThread() {
+        return cleanerThread;
+    }
+
+    private ShutdownableThread createCleanerThread() {
+        ShutdownableThread thread = new 
ShutdownableThread("remote-log-index-cleaner") {
+            public void doWork() {
+                try {
+                    Entry entry = expiredIndexes.take();
+                    log.debug("Cleaning up index entry {}", entry);
+                    entry.cleanup();
+                } catch (InterruptedException ie) {
+                    // cleaner thread should only be interrupted when cache is 
being closed, else it's an error
+                    if (!isRemoteIndexCacheClosed.get()) {
+                        log.error("Cleaner thread received interruption but 
remote index cache is not closed", ie);
+                        // propagate the InterruptedException outside to 
correctly close the thread.
+                        throw new KafkaException(ie);
+                    } else {
+                        log.debug("Cleaner thread was interrupted on cache 
shutdown");
+                    }
+                } catch (Exception ex) {
+                    // do not exit for exceptions other than 
InterruptedException
+                    log.error("Error occurred while cleaning up expired 
entry", ex);
+                }
+            }
+
+        };
+        thread.setDaemon(true);
+
+        return thread;
+    }
+
+    private void init() throws IOException {
+        long start = Time.SYSTEM.hiResClockMs();
+
+        try {
+            Files.createDirectory(cacheDir.toPath());
+            log.info("Created new file {} for RemoteIndexCache", cacheDir);
+        } catch (FileAlreadyExistsException e) {
+            log.info("RemoteIndexCache directory {} already exists. Re-using 
the same directory.", cacheDir);
+        } catch (Exception e) {
+            log.error("Unable to create directory {} for RemoteIndexCache.", 
cacheDir, e);
+            throw new KafkaException(e);
+        }
+
+        // Delete any .deleted or .tmp files remained from the earlier run of 
the broker.
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            paths.forEach(path -> {
+                if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
+                        path.endsWith(TMP_FILE_SUFFIX)) {
+                    try {
+                        if (Files.deleteIfExists(path)) {
+                            log.debug("Deleted file path {} on cache 
initialization", path);
+                        }
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+                }
+            });
+        }
+
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            Iterator<Path> iterator = paths.iterator();
+            while (iterator.hasNext()) {
+                Path path = iterator.next();
+                Path fileNamePath = path.getFileName();
+                if (fileNamePath == null)
+                    throw new KafkaException("Empty file name in remote index 
cache directory: " + cacheDir);
+
+                String indexFileName = fileNamePath.toString();
+                Uuid uuid = 
remoteLogSegmentIdFromRemoteIndexFileName(indexFileName);
+
+                // It is safe to update the internalCache non-atomically here 
since this function is always called by a single
+                // thread only.
+                if (!internalCache.asMap().containsKey(uuid)) {
+                    String fileNameWithoutSuffix = indexFileName.substring(0, 
indexFileName.indexOf("."));
+                    File offsetIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
+                    File timestampIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
+                    File txnIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TXN_INDEX_FILE_SUFFIX);
+
+                    // Create entries for each path if all the index files 
exist.
+                    if (Files.exists(offsetIndexFile.toPath()) &&
+                            Files.exists(timestampIndexFile.toPath()) &&
+                            Files.exists(txnIndexFile.toPath())) {
+                        long offset = 
offsetFromRemoteIndexFileName(indexFileName);
+                        OffsetIndex offsetIndex = new 
OffsetIndex(offsetIndexFile, offset, Integer.MAX_VALUE, false);
+                        offsetIndex.sanityCheck();
+
+                        TimeIndex timeIndex = new 
TimeIndex(timestampIndexFile, offset, Integer.MAX_VALUE, false);
+                        timeIndex.sanityCheck();
+
+                        TransactionIndex txnIndex = new 
TransactionIndex(offset, txnIndexFile);
+                        txnIndex.sanityCheck();
+
+                        Entry entry = new Entry(offsetIndex, timeIndex, 
txnIndex);
+                        internalCache.put(uuid, entry);
+                    } else {
+                        // Delete all of them if any one of those indexes is 
not available for a specific segment id
+                        tryAll(Arrays.asList(
+                                () -> {
+                                    
Files.deleteIfExists(offsetIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    
Files.deleteIfExists(timestampIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    
Files.deleteIfExists(txnIndexFile.toPath());
+                                    return null;
+                                }));
+                    }
+                }
+            }
+        }
+        log.info("RemoteIndexCache starts up in {} ms.", 
Time.SYSTEM.hiResClockMs() - start);
+    }
+
+    private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                Function<RemoteLogSegmentMetadata, 
InputStream> fetchRemoteIndex,
+                                Function<File, T> readIndex) throws 
IOException {
+        File indexFile = new File(cacheDir, file.getName());
+        T index = null;
+        if (Files.exists(indexFile.toPath())) {
+            try {
+                index = readIndex.apply(indexFile);
+            } catch (CorruptRecordException ex) {
+                log.info("Error occurred while loading the stored index file 
{}", indexFile.getPath(), ex);
+            }
+        }
+
+        if (index == null) {
+            File tmpIndexFile = new File(indexFile.getParentFile(), 
indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
+
+            try (InputStream inputStream = 
fetchRemoteIndex.apply(remoteLogSegmentMetadata);) {
+                Files.copy(inputStream, tmpIndexFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            }
+
+            Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), 
indexFile.toPath(), false);
+            index = readIndex.apply(indexFile);
+        }
+
+        return index;
+    }
+
+    public Entry getIndexEntry(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        if (isRemoteIndexCacheClosed.get()) throw new 
IllegalStateException("Unable to fetch index for " +
+                "segment id=" + 
remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already 
closed.");
+
+        lock.readLock().lock();
+        try {
+            // while this thread was waiting for lock, another thread may have 
changed the value of isRemoteIndexCacheClosed.
+            // check for index close again
+            if (isRemoteIndexCacheClosed.get()) {
+                throw new IllegalStateException("Unable to fetch index for 
segment id="
+                        + remoteLogSegmentMetadata.remoteLogSegmentId().id() + 
". Index instance is already closed.");
+            }
+
+            return 
internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
+                    (Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata));
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        long startOffset = remoteLogSegmentMetadata.startOffset();
+
+        try {
+            File offsetIndexFile = remoteOffsetIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.OFFSET);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    OffsetIndex index = new OffsetIndex(file, startOffset, 
Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File timeIndexFile = remoteTimeIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            TimeIndex timeIndex = loadIndexFile(timeIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.TIMESTAMP);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    TimeIndex index = new TimeIndex(file, startOffset, 
Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File txnIndexFile = remoteTransactionIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            TransactionIndex txnIndex = loadIndexFile(txnIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.TRANSACTION);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    TransactionIndex index = new TransactionIndex(startOffset, 
file);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+
+            return new Entry(offsetIndex, timeIndex, txnIndex);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, 
long offset) {
+        lock.readLock().lock();
+        try {
+            return 
getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public int lookupTimestamp(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long timestamp, long startingOffset) throws 
IOException {
+        lock.readLock().lock();
+        try {
+            return 
getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void close() {
+        // Make close idempotent and ensure no more reads allowed from 
henceforth. The in-progress reads will continue to
+        // completion (release the read lock) and then close will begin 
executing. Cleaner thread will immediately stop work.
+        if (!isRemoteIndexCacheClosed.getAndSet(true)) {
+            lock.writeLock().lock();
+            try {
+                log.info("Close initiated for RemoteIndexCache. Cache 
stats={}. Cache entries pending delete={}",
+                        internalCache.stats(), expiredIndexes.size());
+                // Initiate shutdown for cleaning thread
+                boolean shutdownRequired = cleanerThread.initiateShutdown();
+                // Close all the opened indexes to force unload mmap memory. 
This does not delete the index files from disk.
+                internalCache.asMap().forEach((uuid, entry) -> entry.close());
+                // wait for cleaner thread to shutdown
+                if (shutdownRequired) cleanerThread.awaitShutdown();
+
+                // Note that internal cache does not require explicit 
cleaning/closing. We don't want to invalidate or cleanup
+                // the cache as both would lead to triggering of removal 
listener.
+
+                log.info("Close completed for RemoteIndexCache");
+            } catch (InterruptedException e) {
+                throw new KafkaException(e);
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+    }
+
+    public static class Entry implements AutoCloseable {
+
+        private final OffsetIndex offsetIndex;
+        private final TimeIndex timeIndex;
+        private final TransactionIndex txnIndex;

Review Comment:
   Should this be optional?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu</a>. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
+    public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+    public static final String DIR_NAME = "remote-log-index-cache";
+
+    /**
+     * Directory where the index files will be stored on disk.
+     */
+    private final File cacheDir;
+
+    /**
+     * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+     */
+    private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+    /**
+     * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+     */
+    private final LinkedBlockingQueue<Entry> expiredIndexes = new 
LinkedBlockingQueue<>();
+
+    /**
+     * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+     * concurrent reads in-progress.
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Actual cache implementation that this file wraps around.
+     *
+     * The requirements for this internal cache is as follows:
+     * 1. Multiple threads should be able to read concurrently.
+     * 2. Fetch for missing keys should not block read for available keys.
+     * 3. Only one thread should fetch for a specific key.
+     * 4. Should support LRU-like policy.
+     *
+     * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
+     */
+    private final Cache<Uuid, Entry> internalCache;
+    private final RemoteStorageManager remoteStorageManager;
+    private final ShutdownableThread cleanerThread;
+
+    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+        this(1024, remoteStorageManager, logDir);
+    }
+
+    /**
+     * Creates RemoteIndexCache with the given configs.
+     *
+     * @param maxSize              maximum number of segment index entries to 
be cached.
+     * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+     * @param logDir               log directory
+     */
+    public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+        this.remoteStorageManager = remoteStorageManager;
+        cacheDir = new File(logDir, DIR_NAME);
+
+        internalCache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                // removeListener is invoked when either the entry is 
invalidated (means manual removal by the caller) or
+                // evicted (means removal due to the policy)
+                .removalListener((Uuid key, Entry entry, RemovalCause cause) 
-> {
+                    // Mark the entries for cleanup and add them to the queue 
to be garbage collected later by the background thread.
+                    if (entry != null) {
+                        try {
+                            entry.markForCleanup();
+                        } catch (IOException e) {
+                            throw new KafkaException(e);
+                        }
+                        if (!expiredIndexes.offer(entry)) {
+                            log.error("Error while inserting entry {} for key 
{} into the cleaner queue because queue is full.", entry, key);
+                        }
+                    } else {
+                        log.error("Received entry as null for key {} when the 
it is removed from the cache.", key);
+                    }
+                }).build();
+
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public Collection<Entry> expiredIndexes() {
+        return Collections.unmodifiableCollection(expiredIndexes);
+    }
+
+    // Visible for testing
+    public Cache<Uuid, Entry> internalCache() {
+        return internalCache;
+    }
+
+    // Visible for testing
+    public ShutdownableThread cleanerThread() {
+        return cleanerThread;
+    }
+
+    private ShutdownableThread createCleanerThread() {
+        ShutdownableThread thread = new 
ShutdownableThread("remote-log-index-cleaner") {
+            public void doWork() {
+                try {
+                    Entry entry = expiredIndexes.take();
+                    log.debug("Cleaning up index entry {}", entry);
+                    entry.cleanup();
+                } catch (InterruptedException ie) {
+                    // cleaner thread should only be interrupted when cache is 
being closed, else it's an error
+                    if (!isRemoteIndexCacheClosed.get()) {
+                        log.error("Cleaner thread received interruption but 
remote index cache is not closed", ie);
+                        // propagate the InterruptedException outside to 
correctly close the thread.
+                        throw new KafkaException(ie);
+                    } else {
+                        log.debug("Cleaner thread was interrupted on cache 
shutdown");
+                    }
+                } catch (Exception ex) {
+                    // do not exit for exceptions other than 
InterruptedException
+                    log.error("Error occurred while cleaning up expired 
entry", ex);
+                }
+            }
+
+        };
+        thread.setDaemon(true);
+
+        return thread;
+    }
+
+    private void init() throws IOException {
+        long start = Time.SYSTEM.hiResClockMs();
+
+        try {
+            Files.createDirectory(cacheDir.toPath());
+            log.info("Created new file {} for RemoteIndexCache", cacheDir);
+        } catch (FileAlreadyExistsException e) {
+            log.info("RemoteIndexCache directory {} already exists. Re-using 
the same directory.", cacheDir);
+        } catch (Exception e) {
+            log.error("Unable to create directory {} for RemoteIndexCache.", 
cacheDir, e);
+            throw new KafkaException(e);
+        }
+
+        // Delete any .deleted or .tmp files remained from the earlier run of 
the broker.
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            paths.forEach(path -> {
+                if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
+                        path.endsWith(TMP_FILE_SUFFIX)) {
+                    try {
+                        if (Files.deleteIfExists(path)) {
+                            log.debug("Deleted file path {} on cache 
initialization", path);
+                        }
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+                }
+            });
+        }
+
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            Iterator<Path> iterator = paths.iterator();
+            while (iterator.hasNext()) {
+                Path path = iterator.next();
+                Path fileNamePath = path.getFileName();
+                if (fileNamePath == null)
+                    throw new KafkaException("Empty file name in remote index 
cache directory: " + cacheDir);
+
+                String indexFileName = fileNamePath.toString();
+                Uuid uuid = 
remoteLogSegmentIdFromRemoteIndexFileName(indexFileName);
+
+                // It is safe to update the internalCache non-atomically here 
since this function is always called by a single
+                // thread only.
+                if (!internalCache.asMap().containsKey(uuid)) {
+                    String fileNameWithoutSuffix = indexFileName.substring(0, 
indexFileName.indexOf("."));
+                    File offsetIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
+                    File timestampIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
+                    File txnIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TXN_INDEX_FILE_SUFFIX);
+
+                    // Create entries for each path if all the index files 
exist.
+                    if (Files.exists(offsetIndexFile.toPath()) &&
+                            Files.exists(timestampIndexFile.toPath()) &&
+                            Files.exists(txnIndexFile.toPath())) {
+                        long offset = 
offsetFromRemoteIndexFileName(indexFileName);
+                        OffsetIndex offsetIndex = new 
OffsetIndex(offsetIndexFile, offset, Integer.MAX_VALUE, false);
+                        offsetIndex.sanityCheck();
+
+                        TimeIndex timeIndex = new 
TimeIndex(timestampIndexFile, offset, Integer.MAX_VALUE, false);
+                        timeIndex.sanityCheck();
+
+                        TransactionIndex txnIndex = new 
TransactionIndex(offset, txnIndexFile);
+                        txnIndex.sanityCheck();
+
+                        Entry entry = new Entry(offsetIndex, timeIndex, 
txnIndex);
+                        internalCache.put(uuid, entry);
+                    } else {
+                        // Delete all of them if any one of those indexes is 
not available for a specific segment id
+                        tryAll(Arrays.asList(
+                                () -> {
+                                    
Files.deleteIfExists(offsetIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    
Files.deleteIfExists(timestampIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    
Files.deleteIfExists(txnIndexFile.toPath());
+                                    return null;
+                                }));
+                    }
+                }
+            }
+        }
+        log.info("RemoteIndexCache starts up in {} ms.", 
Time.SYSTEM.hiResClockMs() - start);
+    }
+
+    private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                Function<RemoteLogSegmentMetadata, 
InputStream> fetchRemoteIndex,
+                                Function<File, T> readIndex) throws 
IOException {
+        File indexFile = new File(cacheDir, file.getName());
+        T index = null;
+        if (Files.exists(indexFile.toPath())) {
+            try {
+                index = readIndex.apply(indexFile);
+            } catch (CorruptRecordException ex) {
+                log.info("Error occurred while loading the stored index file 
{}", indexFile.getPath(), ex);
+            }
+        }
+
+        if (index == null) {
+            File tmpIndexFile = new File(indexFile.getParentFile(), 
indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
+
+            try (InputStream inputStream = 
fetchRemoteIndex.apply(remoteLogSegmentMetadata);) {
+                Files.copy(inputStream, tmpIndexFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            }
+
+            Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), 
indexFile.toPath(), false);
+            index = readIndex.apply(indexFile);
+        }
+
+        return index;
+    }
+
+    public Entry getIndexEntry(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        if (isRemoteIndexCacheClosed.get()) throw new 
IllegalStateException("Unable to fetch index for " +
+                "segment id=" + 
remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already 
closed.");
+
+        lock.readLock().lock();
+        try {
+            // while this thread was waiting for lock, another thread may have 
changed the value of isRemoteIndexCacheClosed.
+            // check for index close again
+            if (isRemoteIndexCacheClosed.get()) {
+                throw new IllegalStateException("Unable to fetch index for 
segment id="
+                        + remoteLogSegmentMetadata.remoteLogSegmentId().id() + 
". Index instance is already closed.");
+            }
+
+            return 
internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
+                    (Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata));
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        long startOffset = remoteLogSegmentMetadata.startOffset();
+
+        try {
+            File offsetIndexFile = remoteOffsetIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.OFFSET);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    OffsetIndex index = new OffsetIndex(file, startOffset, 
Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File timeIndexFile = remoteTimeIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            TimeIndex timeIndex = loadIndexFile(timeIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.TIMESTAMP);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    TimeIndex index = new TimeIndex(file, startOffset, 
Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File txnIndexFile = remoteTransactionIndexFile(cacheDir, 
remoteLogSegmentMetadata);
+            TransactionIndex txnIndex = loadIndexFile(txnIndexFile, 
remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, 
IndexType.TRANSACTION);

Review Comment:
   Same here, what if this returns `RemoteResourceNotFound`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu</a>. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
+    public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+    public static final String DIR_NAME = "remote-log-index-cache";
+
+    /**
+     * Directory where the index files will be stored on disk.
+     */
+    private final File cacheDir;
+
+    /**
+     * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+     */
+    private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+    /**
+     * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+     */
+    private final LinkedBlockingQueue<Entry> expiredIndexes = new 
LinkedBlockingQueue<>();
+
+    /**
+     * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+     * concurrent reads in-progress.
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Actual cache implementation that this file wraps around.
+     *
+     * The requirements for this internal cache is as follows:
+     * 1. Multiple threads should be able to read concurrently.
+     * 2. Fetch for missing keys should not block read for available keys.
+     * 3. Only one thread should fetch for a specific key.
+     * 4. Should support LRU-like policy.
+     *
+     * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
+     */
+    private final Cache<Uuid, Entry> internalCache;
+    private final RemoteStorageManager remoteStorageManager;
+    private final ShutdownableThread cleanerThread;
+
+    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+        this(1024, remoteStorageManager, logDir);
+    }
+
+    /**
+     * Creates RemoteIndexCache with the given configs.
+     *
+     * @param maxSize              maximum number of segment index entries to 
be cached.
+     * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+     * @param logDir               log directory
+     */
+    public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+        this.remoteStorageManager = remoteStorageManager;
+        cacheDir = new File(logDir, DIR_NAME);
+
+        internalCache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                // removeListener is invoked when either the entry is 
invalidated (means manual removal by the caller) or
+                // evicted (means removal due to the policy)
+                .removalListener((Uuid key, Entry entry, RemovalCause cause) 
-> {
+                    // Mark the entries for cleanup and add them to the queue 
to be garbage collected later by the background thread.
+                    if (entry != null) {
+                        try {
+                            entry.markForCleanup();
+                        } catch (IOException e) {
+                            throw new KafkaException(e);
+                        }
+                        if (!expiredIndexes.offer(entry)) {
+                            log.error("Error while inserting entry {} for key 
{} into the cleaner queue because queue is full.", entry, key);
+                        }
+                    } else {
+                        log.error("Received entry as null for key {} when the 
it is removed from the cache.", key);
+                    }
+                }).build();
+
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public Collection<Entry> expiredIndexes() {
+        return Collections.unmodifiableCollection(expiredIndexes);
+    }
+
+    // Visible for testing
+    public Cache<Uuid, Entry> internalCache() {
+        return internalCache;
+    }
+
+    // Visible for testing
+    public ShutdownableThread cleanerThread() {
+        return cleanerThread;
+    }
+
+    private ShutdownableThread createCleanerThread() {
+        ShutdownableThread thread = new 
ShutdownableThread("remote-log-index-cleaner") {
+            public void doWork() {
+                try {
+                    Entry entry = expiredIndexes.take();
+                    log.debug("Cleaning up index entry {}", entry);
+                    entry.cleanup();
+                } catch (InterruptedException ie) {
+                    // cleaner thread should only be interrupted when cache is 
being closed, else it's an error
+                    if (!isRemoteIndexCacheClosed.get()) {
+                        log.error("Cleaner thread received interruption but 
remote index cache is not closed", ie);
+                        // propagate the InterruptedException outside to 
correctly close the thread.
+                        throw new KafkaException(ie);
+                    } else {
+                        log.debug("Cleaner thread was interrupted on cache 
shutdown");
+                    }
+                } catch (Exception ex) {
+                    // do not exit for exceptions other than 
InterruptedException
+                    log.error("Error occurred while cleaning up expired 
entry", ex);
+                }
+            }
+
+        };
+        thread.setDaemon(true);
+
+        return thread;
+    }
+
+    private void init() throws IOException {
+        long start = Time.SYSTEM.hiResClockMs();
+
+        try {
+            Files.createDirectory(cacheDir.toPath());
+            log.info("Created new file {} for RemoteIndexCache", cacheDir);
+        } catch (FileAlreadyExistsException e) {
+            log.info("RemoteIndexCache directory {} already exists. Re-using 
the same directory.", cacheDir);
+        } catch (Exception e) {
+            log.error("Unable to create directory {} for RemoteIndexCache.", 
cacheDir, e);
+            throw new KafkaException(e);
+        }
+
+        // Delete any .deleted or .tmp files remained from the earlier run of 
the broker.
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            paths.forEach(path -> {
+                if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
+                        path.endsWith(TMP_FILE_SUFFIX)) {
+                    try {
+                        if (Files.deleteIfExists(path)) {
+                            log.debug("Deleted file path {} on cache 
initialization", path);
+                        }
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+                }
+            });
+        }
+
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            Iterator<Path> iterator = paths.iterator();
+            while (iterator.hasNext()) {
+                Path path = iterator.next();
+                Path fileNamePath = path.getFileName();
+                if (fileNamePath == null)
+                    throw new KafkaException("Empty file name in remote index 
cache directory: " + cacheDir);
+
+                String indexFileName = fileNamePath.toString();
+                Uuid uuid = 
remoteLogSegmentIdFromRemoteIndexFileName(indexFileName);
+
+                // It is safe to update the internalCache non-atomically here 
since this function is always called by a single
+                // thread only.
+                if (!internalCache.asMap().containsKey(uuid)) {
+                    String fileNameWithoutSuffix = indexFileName.substring(0, 
indexFileName.indexOf("."));
+                    File offsetIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
+                    File timestampIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
+                    File txnIndexFile = new File(cacheDir, 
fileNameWithoutSuffix + TXN_INDEX_FILE_SUFFIX);

Review Comment:
   What if this segment does not contain transaction index as it's optional? 
Should the condition to create entries consider this alternative?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to