satishd commented on code in PR #14381: URL: https://github.com/apache/kafka/pull/14381#discussion_r1344604731
########## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ########## @@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig { atLeast(0), LOW, REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC) - .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-15535 to followup on adding documentation. ########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -1129,3 +1131,50 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala } + +class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging { + override def reconfigurableConfigs: Set[String] = { + DynamicRemoteLogConfig.ReconfigurableConfigs + } + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + newConfig.values.forEach { (k, v) => + if (reconfigurableConfigs.contains(k)) { + val newValue = v.asInstanceOf[Long] Review Comment: All the values are assumed to be of type `long`. Please have better checks based on the respective property names and values. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -112,27 +115,55 @@ public class RemoteIndexCache implements Closeable { * * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own. */ - private final Cache<Uuid, Entry> internalCache; - private final RemoteStorageManager remoteStorageManager; - private final ShutdownableThread cleanerThread; + private Cache<Uuid, Entry> internalCache; + // Visible for testing public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { - this(1024, remoteStorageManager, logDir); + this(DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES, remoteStorageManager, logDir); } /** * Creates RemoteIndexCache with the given configs. * - * @param maxSize maximum number of segment index entries to be cached. + * @param maxSize maximum bytes size of segment index entries to be cached. * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. * @param logDir log directory */ - public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { + public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { this.remoteStorageManager = remoteStorageManager; cacheDir = new File(logDir, DIR_NAME); - internalCache = Caffeine.newBuilder() - .maximumSize(maxSize) + internalCache = initEmptyCache(maxSize); + init(); + + // Start cleaner thread that will clean the expired entries. + cleanerThread = createCleanerThread(); + cleanerThread.start(); + } + + public void resizeCacheSize(long remoteLogIndexFileCacheSize) { + lock.writeLock().lock(); + try { + // When resizing the cache, we always start with an empty cache. There are two main reasons: + // 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old + // cache to the new cache in time when resizing inside. + // 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry + // in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches + // will be inconsistent. + internalCache.invalidateAll(); + log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir."); + internalCache = initEmptyCache(remoteLogIndexFileCacheSize); Review Comment: Do we have a JIRA tracking the improvement suggested in the comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org