Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


satishd merged PR #14381:
URL: https://github.com/apache/kafka/pull/14381


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


satishd commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751951542

   There are a few unrelated test failures, merging it to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


hudeqi commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751943708

   > Thanks @hudeqi for addressing the review comments. LGTM.
   
   Thanks your review, @satishd please merge it. I have resolved several 
conflicts. I see that there seems to be another PR that conflicts with this PR 
:) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-07 Thread via GitHub


hudeqi commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349610922


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1140,12 +1140,14 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 newConfig.values.forEach { (k, v) =>
   if (reconfigurableConfigs.contains(k)) {
-val newValue = v.asInstanceOf[Long]
-val oldValue = getValue(server.config, k)
-if (newValue != oldValue) {
-  val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
-  if (newValue <= 0)
-throw new ConfigException(s"$errorMsg, value should be at least 1")
+if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
+  val newValue = v.asInstanceOf[Long]
+  val oldValue = getValue(server.config, k)
+  if (newValue != oldValue) {
+val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-07 Thread via GitHub


satishd commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349603877


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1140,12 +1140,14 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 newConfig.values.forEach { (k, v) =>
   if (reconfigurableConfigs.contains(k)) {
-val newValue = v.asInstanceOf[Long]
-val oldValue = getValue(server.config, k)
-if (newValue != oldValue) {
-  val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
-  if (newValue <= 0)
-throw new ConfigException(s"$errorMsg, value should be at least 1")
+if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
+  val newValue = v.asInstanceOf[Long]
+  val oldValue = getValue(server.config, k)
+  if (newValue != oldValue) {
+val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"

Review Comment:
   Have this defined in the below if condition as it is used only for that. 
   
   ```
   if (newValue <= 0) {
 val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
 throw new ConfigException(s"$errorMsg, value should be at least 1")
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-06 Thread via GitHub


hudeqi commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349454991


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
   atLeast(0),
   LOW,
   
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-  
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,

Review Comment:
   This configuration name and the corresponding doc have been provided by you 
before, but have not been used. This time, the configuration name is directly 
reused.
   I also read other configuration docs related to tiered storage and found 
nothing missing. @satishd 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-06 Thread via GitHub


hudeqi commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349450982


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
   atLeast(0),
   LOW,
   
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-  
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,

Review Comment:
   Thanks, I have assigned it to myself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-06 Thread via GitHub


hudeqi commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349453329


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1129,3 +1131,50 @@ class DynamicProducerStateManagerConfig(val 
producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = 
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+DynamicRemoteLogConfig.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+newConfig.values.forEach { (k, v) =>
+  if (reconfigurableConfigs.contains(k)) {
+val newValue = v.asInstanceOf[Long]

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-06 Thread via GitHub


hudeqi commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1349450982


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
   atLeast(0),
   LOW,
   
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-  
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,

Review Comment:
   Thanks, I have assigned it to myself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-03 Thread via GitHub


showuon commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1345081564


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -112,27 +115,55 @@ public class RemoteIndexCache implements Closeable {
  *
  * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
  */
-private final Cache internalCache;
-private final RemoteStorageManager remoteStorageManager;
-private final ShutdownableThread cleanerThread;
+private Cache internalCache;
 
+// Visible for testing
 public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
-this(1024, remoteStorageManager, logDir);
+this(DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES, remoteStorageManager, 
logDir);
 }
 
 /**
  * Creates RemoteIndexCache with the given configs.
  *
- * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param maxSize  maximum bytes size of segment index entries 
to be cached.
  * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
  * @param logDir   log directory
  */
-public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
 this.remoteStorageManager = remoteStorageManager;
 cacheDir = new File(logDir, DIR_NAME);
 
-internalCache = Caffeine.newBuilder()
-.maximumSize(maxSize)
+internalCache = initEmptyCache(maxSize);
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+lock.writeLock().lock();
+try {
+// When resizing the cache, we always start with an empty cache. 
There are two main reasons:
+// 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
+// cache to the new cache in time when resizing inside.
+// 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
+// in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
+// will be inconsistent.
+internalCache.invalidateAll();
+log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
+internalCache = initEmptyCache(remoteLogIndexFileCacheSize);

Review Comment:
   Good point!
   Created https://issues.apache.org/jira/browse/KAFKA-15536 for this issue, 
and assigned to @hudeqi . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-03 Thread via GitHub


satishd commented on code in PR #14381:
URL: https://github.com/apache/kafka/pull/14381#discussion_r1344604731


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
   atLeast(0),
   LOW,
   
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-  
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-15535 to followup on 
adding documentation. 



##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1129,3 +1131,50 @@ class DynamicProducerStateManagerConfig(val 
producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = 
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+DynamicRemoteLogConfig.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+newConfig.values.forEach { (k, v) =>
+  if (reconfigurableConfigs.contains(k)) {
+val newValue = v.asInstanceOf[Long]

Review Comment:
   All the values are assumed to be of type `long`. Please have better checks 
based on the respective property names and values. 



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -112,27 +115,55 @@ public class RemoteIndexCache implements Closeable {
  *
  * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
  */
-private final Cache internalCache;
-private final RemoteStorageManager remoteStorageManager;
-private final ShutdownableThread cleanerThread;
+private Cache internalCache;
 
+// Visible for testing
 public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
-this(1024, remoteStorageManager, logDir);
+this(DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES, remoteStorageManager, 
logDir);
 }
 
 /**
  * Creates RemoteIndexCache with the given configs.
  *
- * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param maxSize  maximum bytes size of segment index entries 
to be cached.
  * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
  * @param logDir   log directory
  */
-public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
 this.remoteStorageManager = remoteStorageManager;
 cacheDir = new File(logDir, DIR_NAME);
 
-internalCache = Caffeine.newBuilder()
-.maximumSize(maxSize)
+internalCache = initEmptyCache(maxSize);
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+lock.writeLock().lock();
+try {
+// When resizing the cache, we always start with an empty cache. 
There are two main reasons:
+// 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
+// cache to the new cache in time when resizing inside.
+// 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
+// in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
+// will be inconsistent.
+internalCache.invalidateAll();
+log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
+internalCache = initEmptyCache(remoteLogIndexFileCacheSize);

Review Comment:
   Do we have a JIRA tracking the improvement suggested in the comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org