This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02c794dfd3b KAFKA-15776: Introduce remote.fetch.max.timeout.ms to 
configure DelayedRemoteFetch timeout (#14778)
02c794dfd3b is described below

commit 02c794dfd3bda12066031155bd62234ad863860b
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Wed Jun 5 12:12:23 2024 +0530

    KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure 
DelayedRemoteFetch timeout (#14778)
    
    KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure 
DelayedRemoteFetch timeout
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   5 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala    |   3 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   3 +-
 .../kafka/server/DelayedRemoteFetchTest.scala      |  21 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 342 +++++++++++----------
 .../remote/storage/RemoteLogManagerConfigTest.java |   6 +-
 6 files changed, 203 insertions(+), 177 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7ec147e9e3c..76bfe7e91a1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>fetch.max.wait.ms</code>
      */
     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
-    private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of 
time the server will block before answering the fetch request if there isn't 
sufficient data to immediately satisfy the requirement given by 
fetch.min.bytes.";
+    private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of 
time the server will block before " +
+            "answering the fetch request there isn't sufficient data to 
immediately satisfy the requirement given by " +
+            "fetch.min.bytes. This config is used only for local log fetch. To 
tune the remote fetch maximum wait " +
+            "time, please refer to 'remote.fetch.max.wait.ms' broker config";
     public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
 
     /** <code>metadata.max.age.ms</code> */
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 00d6afb89ff..58a866aa4a6 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -35,12 +35,13 @@ import scala.collection._
 class DelayedRemoteFetch(remoteFetchTask: Future[Void],
                          remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
                          remoteFetchInfo: RemoteStorageFetchInfo,
+                         remoteFetchMaxWaitMs: Long,
                          fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
                          fetchParams: FetchParams,
                          localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
                          replicaManager: ReplicaManager,
                          responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
-  extends DelayedOperation(fetchParams.maxWaitMs) {
+  extends DelayedOperation(remoteFetchMaxWaitMs) {
 
   if (fetchParams.isFromFollower) {
     throw new IllegalStateException(s"The follower should not invoke remote 
fetch. Fetch params are: $fetchParams")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a2a070bcd03..74023dea1aa 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig,
         return Some(createLogReadResult(e))
     }
 
-    val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo,
+    val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
+    val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
       fetchPartitionStatus, params, logReadResults, this, responseCallback)
 
     delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index c35385bcbc8..ea1ffaf0b11 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
   private val fetchOffset = 500L
   private val logStartOffset = 0L
   private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val remoteFetchMaxWaitMs = 500
 
   private val fetchStatus = FetchPartitionStatus(
     startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
@@ -64,8 +65,8 @@ class DelayedRemoteFetchTest {
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
-      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
 
     
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
       .thenReturn(mock(classOf[Partition]))
@@ -100,8 +101,8 @@ class DelayedRemoteFetchTest {
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
     val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
-    assertThrows(classOf[IllegalStateException], () => new 
DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> 
fetchStatus), fetchParams,
-      Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
+    assertThrows(classOf[IllegalStateException], () => new 
DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback))
   }
 
   @Test
@@ -124,8 +125,8 @@ class DelayedRemoteFetchTest {
 
     val logReadInfo = buildReadResult(Errors.NONE)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
-      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
 
     // delayed remote fetch should still be able to complete
     assertTrue(delayedRemoteFetch.tryComplete())
@@ -155,8 +156,8 @@ class DelayedRemoteFetchTest {
     // build a read result with error
     val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
-      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
 
     assertTrue(delayedRemoteFetch.tryComplete())
     assertTrue(delayedRemoteFetch.isCompleted)
@@ -184,8 +185,8 @@ class DelayedRemoteFetchTest {
     val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, 
fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
-      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, 
fetchInfo, remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
 
     
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
       .thenReturn(mock(classOf[Partition]))
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1add933d788..e41cc011c31 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig {
             "implementation. For example this value can be `rlmm.config.`.";
     public static final String 
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
 
-
     public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
     public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tiered storage functionality in a broker or not. Valid values " +
             "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for the tiered storage 
functionality.";
@@ -185,167 +184,178 @@ public final class RemoteLogManagerConfig {
             "The default value is 1 second.";
     public static final int 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
 
+    public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = 
"remote.fetch.max.wait.ms";
+    public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum 
amount of time the server will wait before answering the remote fetch request";
+    public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef();
 
     static {
-        CONFIG_DEF.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
-                                  BOOLEAN,
-                                  DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
-                                  null,
-                                  MEDIUM,
-                                  REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
-                  .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
-                                  STRING,
-                                  DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
-                                  new ConfigDef.NonEmptyString(),
-                                  MEDIUM,
-                                  REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
-                  .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
-                                  STRING,
-                                  
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
-                                  new ConfigDef.NonEmptyString(),
-                                  MEDIUM,
-                                  
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
-                  .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
-                                  null,
-                                  new ConfigDef.NonEmptyString(),
-                                  MEDIUM,
-                                  REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
-                  .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
-                                  null,
-                                  null,
-                                  MEDIUM,
-                                  REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
-                  .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
-                                  STRING,
-                                  
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
-                                  new ConfigDef.NonEmptyString(),
-                                  MEDIUM,
-                                  REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
-                  .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
-                                  STRING,
-                                  null,
-                                  null,
-                                  MEDIUM,
-                                  REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
-                  .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, 
STRING,
-                                  null,
-                                  new ConfigDef.NonEmptyString(),
-                                  MEDIUM,
-                                  
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
-                  .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
-                                  INT,
-                                  
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
-                                  atLeast(0),
-                                  LOW,
-                                  
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-                  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
-                                  LONG,
-                                  
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
-                                  atLeast(1),
-                                  LOW,
-                                  
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
-                  .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
-                                  INT,
-                                  DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
-                                  atLeast(1),
-                                  MEDIUM,
-                                  REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
-                  
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+        CONFIG_DEF
+                .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
+                        BOOLEAN,
+                        DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
+                        null,
+                        MEDIUM,
+                        REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
+                .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
+                        STRING,
+                        DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
+                        new ConfigDef.NonEmptyString(),
+                        MEDIUM,
+                        REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
+                .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
+                        STRING,
+                        DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
+                        new ConfigDef.NonEmptyString(),
+                        MEDIUM,
+                        REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
+                .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
+                        null,
+                        new ConfigDef.NonEmptyString(),
+                        MEDIUM,
+                        REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
+                .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
+                        null,
+                        null,
+                        MEDIUM,
+                        REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
+                .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+                        STRING,
+                        DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
+                        new ConfigDef.NonEmptyString(),
+                        MEDIUM,
+                        REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
+                .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
+                        STRING,
+                        null,
+                        null,
+                        MEDIUM,
+                        REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
+                .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
+                        null,
+                        new ConfigDef.NonEmptyString(),
+                        MEDIUM,
+                        REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+                .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+                        atLeast(0),
+                        LOW,
+                        REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
+                .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
+                        atLeast(1),
+                        LOW,
+                        REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
+                .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
+                
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
                         INT,
                         DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
                         atLeast(1),
                         MEDIUM,
                         REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
-                  
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+                
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
                         INT,
                         DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
                         atLeast(1),
                         MEDIUM,
                         REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
-                  .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
-                                  LONG,
-                                  DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
-                                  atLeast(1),
-                                  LOW,
-                                  REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC)
-                  
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
-                                  LONG,
-                                  
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS,
-                                  atLeast(1),
-                                  LOW,
-                                  
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC)
-                  
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
-                                  LONG,
-                                  
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS,
-                                  atLeast(1), LOW,
-                                  
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC)
-                  .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
-                                  DOUBLE,
-                                  DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER,
-                                  between(0, 0.5),
-                                  LOW,
-                                  REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC)
-                  .define(REMOTE_LOG_READER_THREADS_PROP,
-                                  INT,
-                                  DEFAULT_REMOTE_LOG_READER_THREADS,
-                                  atLeast(1),
-                                  MEDIUM,
-                                  REMOTE_LOG_READER_THREADS_DOC)
-                  .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
-                                  INT,
-                                  DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
-                                  atLeast(1),
-                                  MEDIUM,
-                                  REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
-                  .define(LOG_LOCAL_RETENTION_MS_PROP,
-                                  LONG,
-                                  DEFAULT_LOG_LOCAL_RETENTION_MS,
-                                  atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
-                                  MEDIUM,
-                                  LOG_LOCAL_RETENTION_MS_DOC)
-                  .define(LOG_LOCAL_RETENTION_BYTES_PROP,
-                                  LONG,
-                                  DEFAULT_LOG_LOCAL_RETENTION_BYTES,
-                                  atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
-                                  MEDIUM,
-                                  LOG_LOCAL_RETENTION_BYTES_DOC)
-                  .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
-                                  LONG,
-                                  
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
-                                  atLeast(1),
-                                  MEDIUM,
-                                  
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC)
-                  .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
-                                  INT,
-                                  
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
-                                  atLeast(1),
-                                  MEDIUM,
-                                  REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC)
-                  
.define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
-                                 INT,
-                                 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS,
-                                 atLeast(1),
-                                 MEDIUM,
-                                 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC)
-                  .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
-                                 LONG,
-                                 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
-                                 atLeast(1),
-                                 MEDIUM,
-                                 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC)
-                  .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
-                                 INT,
-                                 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM,
-                                 atLeast(1),
-                                 MEDIUM,
-                                 REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC)
-                  
.define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
-                                 INT,
-                                 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
-                                 atLeast(1),
-                                 MEDIUM,
-                                 
REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC);
+                .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
+                        atLeast(1),
+                        LOW,
+                        REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC)
+                .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS,
+                        atLeast(1),
+                        LOW,
+                        REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC)
+                
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS,
+                        atLeast(1), LOW,
+                        REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC)
+                .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
+                        DOUBLE,
+                        DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER,
+                        between(0, 0.5),
+                        LOW,
+                        REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC)
+                .define(REMOTE_LOG_READER_THREADS_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_READER_THREADS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_READER_THREADS_DOC)
+                .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
+                .define(LOG_LOCAL_RETENTION_MS_PROP,
+                        LONG,
+                        DEFAULT_LOG_LOCAL_RETENTION_MS,
+                        atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
+                        MEDIUM,
+                        LOG_LOCAL_RETENTION_MS_DOC)
+                .define(LOG_LOCAL_RETENTION_BYTES_PROP,
+                        LONG,
+                        DEFAULT_LOG_LOCAL_RETENTION_BYTES,
+                        atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
+                        MEDIUM,
+                        LOG_LOCAL_RETENTION_BYTES_DOC)
+                .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC)
+                .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC)
+                .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
+                        INT,
+                        
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC)
+                .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+                        LONG,
+                        DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC)
+                .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
+                        INT,
+                        DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC)
+                
.define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
+                        INT,
+                        
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC)
+                .define(REMOTE_FETCH_MAX_WAIT_MS_PROP,
+                        INT,
+                        DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
+                        atLeast(1),
+                        MEDIUM,
+                        REMOTE_FETCH_MAX_WAIT_MS_DOC);
     }
 
     private final boolean enableRemoteStorageSystem;
@@ -375,6 +385,7 @@ public final class RemoteLogManagerConfig {
     private final long remoteLogManagerFetchMaxBytesPerSecond;
     private final int remoteLogManagerFetchNumQuotaSamples;
     private final int remoteLogManagerFetchQuotaWindowSizeSeconds;
+    private final int remoteFetchMaxWaitMs;
 
     public RemoteLogManagerConfig(AbstractConfig config) {
         this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -407,7 +418,8 @@ public final class RemoteLogManagerConfig {
             
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP),
             config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP),
             config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP),
-            
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP));
+            
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP),
+            config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP));
     }
 
     // Visible for testing
@@ -437,8 +449,8 @@ public final class RemoteLogManagerConfig {
                                   int 
remoteLogManagerCopyQuotaWindowSizeSeconds,
                                   long remoteLogManagerFetchMaxBytesPerSecond,
                                   int remoteLogManagerFetchNumQuotaSamples,
-                                  int 
remoteLogManagerFetchQuotaWindowSizeSeconds
-    ) {
+                                  int 
remoteLogManagerFetchQuotaWindowSizeSeconds,
+                                  int remoteFetchMaxWaitMs) {
         this.enableRemoteStorageSystem = enableRemoteStorageSystem;
         this.remoteStorageManagerClassName = remoteStorageManagerClassName;
         this.remoteStorageManagerClassPath = remoteStorageManagerClassPath;
@@ -466,6 +478,7 @@ public final class RemoteLogManagerConfig {
         this.remoteLogManagerFetchMaxBytesPerSecond = 
remoteLogManagerFetchMaxBytesPerSecond;
         this.remoteLogManagerFetchNumQuotaSamples = 
remoteLogManagerFetchNumQuotaSamples;
         this.remoteLogManagerFetchQuotaWindowSizeSeconds = 
remoteLogManagerFetchQuotaWindowSizeSeconds;
+        this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
     }
 
     public boolean enableRemoteStorageSystem() {
@@ -576,6 +589,9 @@ public final class RemoteLogManagerConfig {
         return remoteLogManagerFetchQuotaWindowSizeSeconds;
     }
 
+    public int remoteFetchMaxWaitMs() {
+        return remoteFetchMaxWaitMs;
+    }
 
     @Override
     public boolean equals(Object o) {
@@ -608,20 +624,22 @@ public final class RemoteLogManagerConfig {
                 && remoteLogManagerCopyQuotaWindowSizeSeconds == 
that.remoteLogManagerCopyQuotaWindowSizeSeconds
                 && remoteLogManagerFetchMaxBytesPerSecond == 
that.remoteLogManagerFetchMaxBytesPerSecond
                 && remoteLogManagerFetchNumQuotaSamples == 
that.remoteLogManagerFetchNumQuotaSamples
-                && remoteLogManagerFetchQuotaWindowSizeSeconds == 
that.remoteLogManagerFetchQuotaWindowSizeSeconds;
+                && remoteLogManagerFetchQuotaWindowSizeSeconds == 
that.remoteLogManagerFetchQuotaWindowSizeSeconds
+                && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(enableRemoteStorageSystem, 
remoteStorageManagerClassName, remoteStorageManagerClassPath,
-                            remoteLogMetadataManagerClassName, 
remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
-                            remoteLogMetadataCustomMetadataMaxBytes, 
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
-                            remoteLogManagerCopierThreadPoolSize, 
remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
-                            remoteLogManagerTaskRetryBackoffMs, 
remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
-                            remoteLogReaderThreads, 
remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, 
remoteLogMetadataManagerProps,
-                            remoteStorageManagerPrefix, 
remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
-                            remoteLogManagerCopyNumQuotaSamples, 
remoteLogManagerCopyQuotaWindowSizeSeconds, 
remoteLogManagerFetchMaxBytesPerSecond,
-                            remoteLogManagerFetchNumQuotaSamples, 
remoteLogManagerFetchQuotaWindowSizeSeconds);
+        return Objects.hash(
+                enableRemoteStorageSystem, remoteStorageManagerClassName, 
remoteStorageManagerClassPath,
+                remoteLogMetadataManagerClassName, 
remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
+                remoteLogMetadataCustomMetadataMaxBytes, 
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
+                remoteLogManagerCopierThreadPoolSize, 
remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
+                remoteLogManagerTaskRetryBackoffMs, 
remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
+                remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, 
remoteStorageManagerProps, remoteLogMetadataManagerProps,
+                remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, 
remoteLogManagerCopyMaxBytesPerSecond,
+                remoteLogManagerCopyNumQuotaSamples, 
remoteLogManagerCopyQuotaWindowSizeSeconds, 
remoteLogManagerFetchMaxBytesPerSecond,
+                remoteLogManagerFetchNumQuotaSamples, 
remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs);
     }
 
     public static void main(String[] args) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index d04c42409b2..058f81626d1 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -95,7 +95,8 @@ public class RemoteLogManagerConfigTest {
                 1,
                 Long.MAX_VALUE,
                 11,
-                1);
+                1,
+                500);
     }
 
     private Map<String, Object> extractProps(RemoteLogManagerConfig 
remoteLogManagerConfig) {
@@ -189,7 +190,8 @@ public class RemoteLogManagerConfigTest {
                 1,
                 Long.MAX_VALUE,
                 11,
-                1);
+                1,
+                500);
         
         assertNotEquals(config1.hashCode(), config3.hashCode());
         assertNotEquals(config1, config3);


Reply via email to