[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-07 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1256287178


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;

Review Comment:
   Thanks for the explanation, Divij. This change seems fine to me then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-07 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1255005974


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps around.
+ *
+ * The requirements for this internal cache is as follows:
+ * 1. Multiple threads should be 

[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1254942551


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;

Review Comment:
   We introduced a new dependency caffeine here. Could you explain why caffeine 
is chosen and how stable is caffeine? The doc for caffeine mentions the use of 
weak references. A few years back, we avoided the usage of weak references in a 
PR because of the poor GC behavior. Have we done any experiments to understand 
the GC impact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-07 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1128778509


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize / 2, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+

[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-01 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1119167627


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+

Review Comment:
   Could we remove the extra new line?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+