This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05c9322ba9c KAFKA-19814: Protocol and API changes for KIP-1222 (#20732)
05c9322ba9c is described below

commit 05c9322ba9cec74c42a4141bcccc89557db8e105
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Oct 29 19:35:23 2025 -0500

    KAFKA-19814: Protocol and API changes for KIP-1222 (#20732)
    
    Defines the updates protocol RPC schemas for KIPs 1206 and 1222 ready
    for Apache Kafka 4.2.
    
    Defines `AcknowledgeType.RENEW` and the new
    `ShareConsumer.acquisitionLockTimeoutMs()` method.
    
    Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan
     <[email protected]>
---
 .../apache/kafka/clients/consumer/AcknowledgeType.java |  7 ++++++-
 .../kafka/clients/consumer/KafkaShareConsumer.java     | 10 ++++++++++
 .../kafka/clients/consumer/MockShareConsumer.java      |  5 +++++
 .../apache/kafka/clients/consumer/ShareConsumer.java   |  5 +++++
 .../clients/consumer/internals/ShareConsumerImpl.java  |  9 +++++++++
 .../kafka/common/requests/ShareAcknowledgeRequest.java |  7 +++++++
 .../kafka/common/requests/ShareFetchRequest.java       |  7 +++++++
 .../common/message/ShareAcknowledgeRequest.json        |  8 ++++++--
 .../common/message/ShareAcknowledgeResponse.json       |  4 +++-
 .../resources/common/message/ShareFetchRequest.json    | 18 +++++++++++-------
 .../resources/common/message/ShareFetchResponse.json   |  8 +++++---
 11 files changed, 74 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
index b42bc135363..9812503e2b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
@@ -33,7 +33,10 @@ public enum AcknowledgeType {
     RELEASE((byte) 2),
 
     /** The record was not consumed successfully. Reject it and do not release 
it for another delivery attempt. */
-    REJECT((byte) 3);
+    REJECT((byte) 3),
+
+    /** The record is still being processed. Renew the acquisition lock so 
processing can continue. */
+    RENEW((byte) 4);
 
     public final byte id;
 
@@ -55,6 +58,8 @@ public enum AcknowledgeType {
                 return RELEASE;
             case 3:
                 return REJECT;
+            case 4:
+                return RENEW;
             default:
                 throw new IllegalArgumentException("Unknown acknowledge type 
id: " + id);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index b18ec93ec7b..8bd44eee031 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -632,6 +632,16 @@ public class KafkaShareConsumer<K, V> implements 
ShareConsumer<K, V> {
         return delegate.clientInstanceId(timeout);
     }
 
+    /**
+     * Returns the acquisition lock timeout for the last set of records 
fetched from the cluster.
+     *
+     * @return The acquisition lock timeout in milliseconds, or {@code 
Optional.empty()} if the timeout is not known.
+     */
+    @Override
+    public Optional<Integer> acquisitionLockTimeoutMs() {
+        return delegate.acquisitionLockTimeoutMs();
+    }
+
     /**
      * Get the metrics kept by the consumer
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
index f1dad522d5a..8aeab59a2b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
@@ -145,6 +145,11 @@ public class MockShareConsumer<K, V> implements 
ShareConsumer<K, V> {
         return Collections.emptyMap();
     }
 
+    @Override
+    public Optional<Integer> acquisitionLockTimeoutMs() {
+        return Optional.empty();
+    }
+
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
index 58f5fc4d38e..b61ac72516c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
@@ -100,6 +100,11 @@ public interface ShareConsumer<K, V> extends Closeable {
      */
     Uuid clientInstanceId(Duration timeout);
 
+    /**
+     * @see KafkaShareConsumer#acquisitionLockTimeoutMs()
+     */
+    Optional<Integer> acquisitionLockTimeoutMs();
+
     /**
      * @see KafkaShareConsumer#metrics()
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 4a7e19a6e56..0530155e538 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -837,6 +837,15 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         return 
ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), 
timeout);
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Optional<Integer> acquisitionLockTimeoutMs() {
+        // To be implemented
+        return Optional.empty();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
index c73df0b1d56..43bec9c5fda 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
 import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -75,6 +76,12 @@ public class ShareAcknowledgeRequest extends AbstractRequest 
{
 
         @Override
         public ShareAcknowledgeRequest build(short version) {
+            if (version < 2) {
+                // The v1 does not support AcknowledgeType RENEW.
+                if (data.isRenewAck()) {
+                    throw new UnsupportedVersionException("The v1 
ShareAcknowledge does not support AcknowledgeType.RENEW");
+                }
+            }
             return new ShareAcknowledgeRequest(data, version);
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 5ede165c2ef..122afa9f4c0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ShareFetchRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -137,6 +138,12 @@ public class ShareFetchRequest extends AbstractRequest {
 
         @Override
         public ShareFetchRequest build(short version) {
+            if (version < 2) {
+                // The v1 does not support AcknowledgeType RENEW.
+                if (data.isRenewAck()) {
+                    throw new UnsupportedVersionException("The v1 ShareFetch 
does not support AcknowledgeType.RENEW");
+                }
+            }
             return new ShareFetchRequest(data, version);
         }
 
diff --git 
a/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json 
b/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
index 561f4a84d2f..78b8c140bcd 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
@@ -21,7 +21,9 @@
   // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but 
removed in Apacke Kafka 4.1.
   //
   // Version 1 is the initial stable version (KIP-932).
-  "validVersions": "1",
+  //
+  // Version 2 introduces Renew acknowledgements (KIP-1222).
+  "validVersions": "1-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "groupId",
@@ -30,6 +32,8 @@
       "about": "The member ID." },
     { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
       "about": "The current share session epoch: 0 to open a share session; -1 
to close it; otherwise increments for consecutive requests." },
+    { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": 
"false",
+      "about": "Whether Renew type acknowledgements present in 
AcknowledgementBatches." },
     { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
       "about": "The topics containing records to acknowledge.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID.", "mapKey": true },
@@ -44,7 +48,7 @@
           { "name": "LastOffset", "type": "int64", "versions": "0+",
             "about": "Last offset (inclusive) of batch of records to 
acknowledge." },
           { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
-            "about": "Array of acknowledge types - 
0:Gap,1:Accept,2:Release,3:Reject." }
+            "about": "Array of acknowledge types - 
0:Gap,1:Accept,2:Release,3:Reject,4:Renew." }
         ]}
       ]}
     ]}
diff --git 
a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json 
b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
index 65d08756983..3ad91f9339b 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
@@ -20,7 +20,9 @@
   // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but 
removed in Apacke Kafka 4.1.
   //
   // Version 1 is the initial stable version (KIP-932).
-  "validVersions": "1",
+  //
+  // Version 2 introduces Renew acknowledgements (KIP-1222).
+  "validVersions": "1-2",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json 
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index d7a4abf1fbb..092c2c6c259 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -21,7 +21,9 @@
   // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but 
removed in Apacke Kafka 4.1.
   //
   // Version 1 is the initial stable version (KIP-932).
-  "validVersions": "1",
+  //
+  // Version 2 introduces ShareAcquireMode and Renew acknowledgements 
(KIP-1206 and KIP-1222).
+  "validVersions": "1-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "groupId",
@@ -40,29 +42,31 @@
       "about": "The maximum number of records to fetch. This limit can be 
exceeded for alignment of batch boundaries." },
     { "name": "BatchSize", "type": "int32", "versions": "1+",
       "about": "The optimal number of records for batches of acquired records 
and acknowledgements." },
+    { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": 
"false",
+      "about": "Whether Renew type acknowledgements present in 
AcknowledgementBatches." },
     { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
       "about": "The topics to fetch.", "fields": [
-      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID.", "mapKey":  true },
+      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID.", "mapKey": true },
       { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
         "about": "The partitions to fetch.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+", 
"mapKey":  true,
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+", 
"mapKey": true,
           "about": "The partition index." },
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0",
           "about": "The maximum bytes to fetch from this partition. 0 when 
only acknowledgement with no fetching is required. See KIP-74 for cases where 
this limit may not be honored." },
         { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", 
"versions": "0+",
           "about": "Record batches to acknowledge.", "fields": [
           { "name": "FirstOffset", "type": "int64", "versions": "0+",
-            "about": "First offset of batch of records to acknowledge."},
+            "about": "First offset of batch of records to acknowledge." },
           { "name": "LastOffset", "type": "int64", "versions": "0+",
-            "about": "Last offset (inclusive) of batch of records to 
acknowledge."},
+            "about": "Last offset (inclusive) of batch of records to 
acknowledge." },
           { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
-            "about": "Array of acknowledge types - 
0:Gap,1:Accept,2:Release,3:Reject."}
+            "about": "Array of acknowledge types - 
0:Gap,1:Accept,2:Release,3:Reject,4:Renew." }
         ]}
       ]}
     ]},
     { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": 
"0+",
       "about": "The partitions to remove from this share session.", "fields": [
-      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID."},
+      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions indexes to forget." }
     ]}
diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json 
b/clients/src/main/resources/common/message/ShareFetchResponse.json
index 5d4ede78da3..cb80d470b1e 100644
--- a/clients/src/main/resources/common/message/ShareFetchResponse.json
+++ b/clients/src/main/resources/common/message/ShareFetchResponse.json
@@ -20,7 +20,9 @@
   // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but 
removed in Apacke Kafka 4.1.
   //
   // Version 1 is the initial stable version (KIP-932).
-  "validVersions": "1",
+  //
+  // Version 2 introduces ShareAcquireMode and Renew acknowledgements 
(KIP-1206 and KIP-1222).
+  "validVersions": "1-2",
   "flexibleVersions": "0+",
   // Supported errors for ErrorCode and AcknowledgeErrorCode:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -68,8 +70,8 @@
             "about": "The latest known leader epoch." }
         ]},
         { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0", "about": "The record data." },
-        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": 
"0+", "about": "The acquired records.", "fields":  [
-          { "name": "FirstOffset", "type":  "int64", "versions": "0+", 
"about": "The earliest offset in this batch of acquired records." },
+        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": 
"0+", "about": "The acquired records.", "fields": [
+          { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": 
"The earliest offset in this batch of acquired records." },
           { "name": "LastOffset", "type": "int64", "versions": "0+", "about": 
"The last offset of this batch of acquired records." },
           { "name": "DeliveryCount", "type": "int16", "versions": "0+", 
"about": "The delivery count of this batch of acquired records." }
         ]}

Reply via email to