This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1059af4eac0 MINOR: Improve docs for client group configs (#19605)
1059af4eac0 is described below
commit 1059af4eac0d9b5d0f439ddca18c73fcceb8b390
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 30 14:04:16 2025 -0400
MINOR: Improve docs for client group configs (#19605)
Improve java docs for session and HB interval client configs & fix
max.poll.interval description
Reviewers: David Jacot <[email protected]>
---
.../java/org/apache/kafka/clients/CommonClientConfigs.java | 14 +++++++++-----
.../internals/AbstractHeartbeatRequestManager.java | 5 +++--
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index aa3b5c9d628..e2022e0f4d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -192,7 +192,8 @@ public class CommonClientConfigs {
+ "is considered
failed and the group will rebalance in order to reassign the partitions to
another member. "
+ "For consumers
using a non-null <code>group.instance.id</code> which reach this timeout,
partitions will not be immediately reassigned. "
+ "Instead, the
consumer will stop sending heartbeats and partitions will be reassigned "
- + "after expiration
of <code>session.timeout.ms</code>. This mirrors the behavior of a static
consumer which has shutdown.";
+ + "after expiration
of the session timeout (defined by the client config
<code>session.timeout.ms</code> if using the Classic rebalance protocol, or by
the broker config <code>group.consumer.session.timeout.ms</code> if using the
Consumer protocol). "
+ + "This mirrors the
behavior of a static consumer which has shutdown.";
public static final String REBALANCE_TIMEOUT_MS_CONFIG =
"rebalance.timeout.ms";
public static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed
time for each worker to join the group "
@@ -206,15 +207,18 @@ public class CommonClientConfigs {
+ "to the broker. If
no heartbeats are received by the broker before the expiration of this session
timeout, "
+ "then the broker
will remove this client from the group and initiate a rebalance. Note that the
value "
+ "must be in the
allowable range as configured in the broker configuration by
<code>group.min.session.timeout.ms</code> "
- + "and
<code>group.max.session.timeout.ms</code>. Note that this configuration is not
supported when <code>group.protocol</code> "
- + "is set to
\"consumer\".";
+ + "and
<code>group.max.session.timeout.ms</code>. Note that this client configuration
is not supported when <code>group.protocol</code> "
+ + "is set to
\"consumer\". In that case, session timeout is controlled by the broker config
<code>group.consumer.session.timeout.ms<code>.";
public static final String HEARTBEAT_INTERVAL_MS_CONFIG =
"heartbeat.interval.ms";
public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time
between heartbeats to the consumer "
+ "coordinator when
using Kafka's group management facilities. Heartbeats are used to ensure that
the "
+ "consumer's
session stays active and to facilitate rebalancing when new consumers join or
leave the group. "
- + "The value must
be set lower than <code>session.timeout.ms</code>, but typically should be set
no higher "
- + "than 1/3 of that
value. It can be adjusted even lower to control the expected time for normal
rebalances.";
+ + "This config is
only supported if <code>group.protocol</code> is set to \"classic\". In that
case, "
+ + "the value must
be set lower than <code>session.timeout.ms</code>, but typically should be set
no higher "
+ + "than 1/3 of that
value. It can be adjusted even lower to control the expected time for normal
rebalances."
+ + "If
<code>group.protocol</code> is set to \"consumer\", this config is not
supported, as "
+ + "the heartbeat
interval is controlled by the broker with
<code>group.consumer.heartbeat.interval.ms<code>.";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG =
"default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the
timeout (in milliseconds) for client APIs. " +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 9d219907926..3998d672006 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -63,8 +63,9 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
protected final Logger logger;
/**
- * Time that the group coordinator will wait on member to revoke its
partitions. This is provided by the group
- * coordinator in the heartbeat
+ * Max time allowed between invocations of poll, defined in the {@link
ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG} config.
+ * This is sent to the coordinator in the first heartbeat to join a group,
to be used as rebalance timeout.
+ * Also, the consumer will proactively rejoin the group on a call to poll
if this time has expired.
*/
protected final int maxPollIntervalMs;