This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05c9322ba9c KAFKA-19814: Protocol and API changes for KIP-1222 (#20732)
05c9322ba9c is described below
commit 05c9322ba9cec74c42a4141bcccc89557db8e105
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Oct 29 19:35:23 2025 -0500
KAFKA-19814: Protocol and API changes for KIP-1222 (#20732)
Defines the updates protocol RPC schemas for KIPs 1206 and 1222 ready
for Apache Kafka 4.2.
Defines `AcknowledgeType.RENEW` and the new
`ShareConsumer.acquisitionLockTimeoutMs()` method.
Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan
<[email protected]>
---
.../apache/kafka/clients/consumer/AcknowledgeType.java | 7 ++++++-
.../kafka/clients/consumer/KafkaShareConsumer.java | 10 ++++++++++
.../kafka/clients/consumer/MockShareConsumer.java | 5 +++++
.../apache/kafka/clients/consumer/ShareConsumer.java | 5 +++++
.../clients/consumer/internals/ShareConsumerImpl.java | 9 +++++++++
.../kafka/common/requests/ShareAcknowledgeRequest.java | 7 +++++++
.../kafka/common/requests/ShareFetchRequest.java | 7 +++++++
.../common/message/ShareAcknowledgeRequest.json | 8 ++++++--
.../common/message/ShareAcknowledgeResponse.json | 4 +++-
.../resources/common/message/ShareFetchRequest.json | 18 +++++++++++-------
.../resources/common/message/ShareFetchResponse.json | 8 +++++---
11 files changed, 74 insertions(+), 14 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
index b42bc135363..9812503e2b6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
@@ -33,7 +33,10 @@ public enum AcknowledgeType {
RELEASE((byte) 2),
/** The record was not consumed successfully. Reject it and do not release
it for another delivery attempt. */
- REJECT((byte) 3);
+ REJECT((byte) 3),
+
+ /** The record is still being processed. Renew the acquisition lock so
processing can continue. */
+ RENEW((byte) 4);
public final byte id;
@@ -55,6 +58,8 @@ public enum AcknowledgeType {
return RELEASE;
case 3:
return REJECT;
+ case 4:
+ return RENEW;
default:
throw new IllegalArgumentException("Unknown acknowledge type
id: " + id);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index b18ec93ec7b..8bd44eee031 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -632,6 +632,16 @@ public class KafkaShareConsumer<K, V> implements
ShareConsumer<K, V> {
return delegate.clientInstanceId(timeout);
}
+ /**
+ * Returns the acquisition lock timeout for the last set of records
fetched from the cluster.
+ *
+ * @return The acquisition lock timeout in milliseconds, or {@code
Optional.empty()} if the timeout is not known.
+ */
+ @Override
+ public Optional<Integer> acquisitionLockTimeoutMs() {
+ return delegate.acquisitionLockTimeoutMs();
+ }
+
/**
* Get the metrics kept by the consumer
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
index f1dad522d5a..8aeab59a2b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
@@ -145,6 +145,11 @@ public class MockShareConsumer<K, V> implements
ShareConsumer<K, V> {
return Collections.emptyMap();
}
+ @Override
+ public Optional<Integer> acquisitionLockTimeoutMs() {
+ return Optional.empty();
+ }
+
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
index 58f5fc4d38e..b61ac72516c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
@@ -100,6 +100,11 @@ public interface ShareConsumer<K, V> extends Closeable {
*/
Uuid clientInstanceId(Duration timeout);
+ /**
+ * @see KafkaShareConsumer#acquisitionLockTimeoutMs()
+ */
+ Optional<Integer> acquisitionLockTimeoutMs();
+
/**
* @see KafkaShareConsumer#metrics()
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 4a7e19a6e56..0530155e538 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -837,6 +837,15 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
return
ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(),
timeout);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Optional<Integer> acquisitionLockTimeoutMs() {
+ // To be implemented
+ return Optional.empty();
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
index c73df0b1d56..43bec9c5fda 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -75,6 +76,12 @@ public class ShareAcknowledgeRequest extends AbstractRequest
{
@Override
public ShareAcknowledgeRequest build(short version) {
+ if (version < 2) {
+ // The v1 does not support AcknowledgeType RENEW.
+ if (data.isRenewAck()) {
+ throw new UnsupportedVersionException("The v1
ShareAcknowledge does not support AcknowledgeType.RENEW");
+ }
+ }
return new ShareAcknowledgeRequest(data, version);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 5ede165c2ef..122afa9f4c0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -137,6 +138,12 @@ public class ShareFetchRequest extends AbstractRequest {
@Override
public ShareFetchRequest build(short version) {
+ if (version < 2) {
+ // The v1 does not support AcknowledgeType RENEW.
+ if (data.isRenewAck()) {
+ throw new UnsupportedVersionException("The v1 ShareFetch
does not support AcknowledgeType.RENEW");
+ }
+ }
return new ShareFetchRequest(data, version);
}
diff --git
a/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
b/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
index 561f4a84d2f..78b8c140bcd 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
@@ -21,7 +21,9 @@
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but
removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
- "validVersions": "1",
+ //
+ // Version 2 introduces Renew acknowledgements (KIP-1222).
+ "validVersions": "1-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "entityType": "groupId",
@@ -30,6 +32,8 @@
"about": "The member ID." },
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
"about": "The current share session epoch: 0 to open a share session; -1
to close it; otherwise increments for consecutive requests." },
+ { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default":
"false",
+ "about": "Whether Renew type acknowledgements present in
AcknowledgementBatches." },
{ "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
"about": "The topics containing records to acknowledge.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID.", "mapKey": true },
@@ -44,7 +48,7 @@
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to
acknowledge." },
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
- "about": "Array of acknowledge types -
0:Gap,1:Accept,2:Release,3:Reject." }
+ "about": "Array of acknowledge types -
0:Gap,1:Accept,2:Release,3:Reject,4:Renew." }
]}
]}
]}
diff --git
a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
index 65d08756983..3ad91f9339b 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
@@ -20,7 +20,9 @@
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but
removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
- "validVersions": "1",
+ //
+ // Version 2 introduces Renew acknowledgements (KIP-1222).
+ "validVersions": "1-2",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index d7a4abf1fbb..092c2c6c259 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -21,7 +21,9 @@
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but
removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
- "validVersions": "1",
+ //
+ // Version 2 introduces ShareAcquireMode and Renew acknowledgements
(KIP-1206 and KIP-1222).
+ "validVersions": "1-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "entityType": "groupId",
@@ -40,29 +42,31 @@
"about": "The maximum number of records to fetch. This limit can be
exceeded for alignment of batch boundaries." },
{ "name": "BatchSize", "type": "int32", "versions": "1+",
"about": "The optimal number of records for batches of acquired records
and acknowledgements." },
+ { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default":
"false",
+ "about": "Whether Renew type acknowledgements present in
AcknowledgementBatches." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
- { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID.", "mapKey": true },
+ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID.", "mapKey": true },
{ "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
"about": "The partitions to fetch.", "fields": [
- { "name": "PartitionIndex", "type": "int32", "versions": "0+",
"mapKey": true,
+ { "name": "PartitionIndex", "type": "int32", "versions": "0+",
"mapKey": true,
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0",
"about": "The maximum bytes to fetch from this partition. 0 when
only acknowledgement with no fetching is required. See KIP-74 for cases where
this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch",
"versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
- "about": "First offset of batch of records to acknowledge."},
+ "about": "First offset of batch of records to acknowledge." },
{ "name": "LastOffset", "type": "int64", "versions": "0+",
- "about": "Last offset (inclusive) of batch of records to
acknowledge."},
+ "about": "Last offset (inclusive) of batch of records to
acknowledge." },
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
- "about": "Array of acknowledge types -
0:Gap,1:Accept,2:Release,3:Reject."}
+ "about": "Array of acknowledge types -
0:Gap,1:Accept,2:Release,3:Reject,4:Renew." }
]}
]}
]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions":
"0+",
"about": "The partitions to remove from this share session.", "fields": [
- { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID."},
+ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions indexes to forget." }
]}
diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json
b/clients/src/main/resources/common/message/ShareFetchResponse.json
index 5d4ede78da3..cb80d470b1e 100644
--- a/clients/src/main/resources/common/message/ShareFetchResponse.json
+++ b/clients/src/main/resources/common/message/ShareFetchResponse.json
@@ -20,7 +20,9 @@
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but
removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
- "validVersions": "1",
+ //
+ // Version 2 introduces ShareAcquireMode and Renew acknowledgements
(KIP-1206 and KIP-1222).
+ "validVersions": "1-2",
"flexibleVersions": "0+",
// Supported errors for ErrorCode and AcknowledgeErrorCode:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -68,8 +70,8 @@
"about": "The latest known leader epoch." }
]},
{ "name": "Records", "type": "records", "versions": "0+",
"nullableVersions": "0", "about": "The record data." },
- { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions":
"0+", "about": "The acquired records.", "fields": [
- { "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The earliest offset in this batch of acquired records." },
+ { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions":
"0+", "about": "The acquired records.", "fields": [
+ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about":
"The earliest offset in this batch of acquired records." },
{ "name": "LastOffset", "type": "int64", "versions": "0+", "about":
"The last offset of this batch of acquired records." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count of this batch of acquired records." }
]}