This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55718212400 KAFKA-19799: Added deliveryCompleteCount to 
ReadShareGroupStateSummary (#20820)
55718212400 is described below

commit 557182124002ffae2dd3096f0b1afc01e5ade62a
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Nov 4 17:46:35 2025 +0530

    KAFKA-19799: Added deliveryCompleteCount to ReadShareGroupStateSummary 
(#20820)
    
    This PR is part of
    
    
[KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval).
    
    In [KAFKA-19797](https://github.com/apache/kafka/pull/20810), a new
    field deliveryCompleteCount was being sent by Share Partition to the
    persister. This PR introduces deliveryCompleteCount in
    ReadShareGroupStateSummary RPC so that it can later be used to calculate
    the share partition lag.
    
    Reviewers: Abhinav Dixit <[email protected]>, Sushant Mahajan
     <[email protected]>, Andrew Schofield <[email protected]>
---
 .../requests/ReadShareGroupStateSummaryResponse.java   |  2 ++
 .../message/ReadShareGroupStateSummaryRequest.json     |  5 ++++-
 .../message/ReadShareGroupStateSummaryResponse.json    |  9 +++++++--
 .../test/scala/unit/kafka/server/KafkaApisTest.scala   |  2 ++
 .../server/share/persister/DefaultStatePersister.java  |  2 ++
 .../server/share/persister/NoOpStatePersister.java     |  3 ++-
 .../kafka/server/share/persister/PartitionFactory.java |  4 ++--
 .../share/persister/PartitionStateSummaryData.java     |  2 ++
 .../persister/ReadShareGroupStateSummaryResult.java    |  3 ++-
 .../share/persister/DefaultStatePersisterTest.java     | 18 ++++++++++--------
 .../kafka/coordinator/share/ShareCoordinatorShard.java |  2 ++
 .../coordinator/share/ShareCoordinatorShardTest.java   |  2 ++
 12 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
index a2787ff82c9..4f8c1c3b0cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
@@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends 
AbstractResponse {
         Uuid topicId,
         int partition,
         long startOffset,
+        int deliveryCompleteCount,
         int leaderEpoch,
         int stateEpoch
     ) {
@@ -110,6 +111,7 @@ public class ReadShareGroupStateSummaryResponse extends 
AbstractResponse {
                         new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                             .setPartition(partition)
                             .setStartOffset(startOffset)
+                            .setDeliveryCompleteCount(deliveryCompleteCount)
                             .setLeaderEpoch(leaderEpoch)
                             .setStateEpoch(stateEpoch)
                     ))
diff --git 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
index cdbad63bfa2..6e2bc52654a 100644
--- 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
+++ 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
@@ -18,7 +18,10 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "ReadShareGroupStateSummaryRequest",
-  "validVersions": "0",
+  // Version 0 is the initial version (KIP-932).
+  //
+  // Version 1 introduces DeliveryCompleteCount in the response (KIP-1226).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
index 81e3edc554e..993043addee 100644
--- 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
+++ 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
@@ -17,7 +17,10 @@
   "apiKey": 87,
   "type": "response",
   "name": "ReadShareGroupStateSummaryResponse",
-  "validVersions": "0",
+  // Version 0 is the initial version (KIP-932).
+  //
+  // Version 1 introduces DeliveryCompleteCount (KIP-1226).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // - NOT_COORDINATOR (version 0+)
   // - COORDINATOR_NOT_AVAILABLE (version 0+)
@@ -44,7 +47,9 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of the share-partition." },
         { "name": "StartOffset", "type": "int64", "versions": "0+",
-          "about": "The share-partition start offset." }
+          "about": "The share-partition start offset." },
+        { "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", 
"ignorable": "true", "default": "-1",
+          "about": "The number of offsets greater than or equal to 
share-partition start offset for which delivery has been completed." }
       ]}
     ]}
   ]
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 01daa19d7a0..691814b94b4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -12240,6 +12240,7 @@ class KafkaApisTest extends Logging {
             .setErrorMessage(null)
             .setStateEpoch(1)
             .setStartOffset(10)
+            .setDeliveryCompleteCount(0)
         ))
     )
 
@@ -12280,6 +12281,7 @@ class KafkaApisTest extends Logging {
             .setErrorMessage(null)
             .setStateEpoch(1)
             .setStartOffset(10)
+            .setDeliveryCompleteCount(0)
         ))
     )
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 776c4f73bdf..4d593a3bd98 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -490,6 +490,7 @@ public class DefaultStatePersister implements Persister {
                                     partitionResult.partition(),
                                     partitionResult.stateEpoch(),
                                     partitionResult.startOffset(),
+                                    partitionResult.deliveryCompleteCount(),
                                     partitionResult.leaderEpoch(),
                                     partitionResult.errorCode(),
                                     partitionResult.errorMessage()))
@@ -501,6 +502,7 @@ public class DefaultStatePersister implements Persister {
                                 -1,
                                 -1,
                                 -1,
+                                -1,
                                 Errors.UNKNOWN_SERVER_ERROR.code(),   // No 
specific public error code exists for InterruptedException / ExecutionException
                                 "Error reading state from share coordinator: " 
+ e.getMessage()));
                         }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
index 908891dd463..270d1fabfa3 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
@@ -93,7 +93,8 @@ public class NoOpStatePersister implements Persister {
             resultArgs.add(new TopicData<>(topicData.topicId(), 
topicData.partitions().stream().
                 map(partitionIdData -> 
PartitionFactory.newPartitionStateSummaryData(
                     partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, 
PartitionFactory.UNINITIALIZED_START_OFFSET,
-                    PartitionFactory.DEFAULT_LEADER_EPOCH, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
+                    PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT, 
PartitionFactory.DEFAULT_LEADER_EPOCH,
+                    PartitionFactory.DEFAULT_ERROR_CODE, 
PartitionFactory.DEFAULT_ERR_MESSAGE))
                 .collect(Collectors.toList())));
         }
         return CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index f0612677fc9..ed998bb0b1f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -48,8 +48,8 @@ public class PartitionFactory {
         return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, 
errorMessage, DEFAULT_LEADER_EPOCH, null);
     }
 
-    public static PartitionStateSummaryData newPartitionStateSummaryData(int 
partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, 
String errorMessage) {
-        return new PartitionData(partition, stateEpoch, startOffset, 
UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, 
null);
+    public static PartitionStateSummaryData newPartitionStateSummaryData(int 
partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int 
leaderEpoch, short errorCode, String errorMessage) {
+        return new PartitionData(partition, stateEpoch, startOffset, 
deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, null);
     }
 
     public static PartitionStateBatchData newPartitionStateBatchData(int 
partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int 
leaderEpoch, List<PersisterStateBatch> stateBatches) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
index 58a9dc10615..7cc8ed2cae1 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
@@ -28,6 +28,8 @@ public interface PartitionStateSummaryData extends 
PartitionInfoData, PartitionI
 
     long startOffset();
 
+    int deliveryCompleteCount();
+
     short errorCode();
 
     String errorMessage();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
index 249eb20ed94..81a1d200c9d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
@@ -39,7 +39,8 @@ public class ReadShareGroupStateSummaryResult implements 
PersisterResult {
                                 readStateSummaryResult.partitions().stream()
                                         .map(partitionResult -> 
PartitionFactory.newPartitionStateSummaryData(
                                                 partitionResult.partition(), 
partitionResult.stateEpoch(), partitionResult.startOffset(),
-                                                partitionResult.leaderEpoch(), 
partitionResult.errorCode(), partitionResult.errorMessage()))
+                                                
partitionResult.deliveryCompleteCount(), partitionResult.leaderEpoch(), 
partitionResult.errorCode(),
+                                                
partitionResult.errorMessage()))
                                         .collect(Collectors.toList())))
                         .collect(Collectors.toList()))
                 .build();
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 9dc1b7886ed..51a617c43db 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -869,7 +869,7 @@ class DefaultStatePersisterTest {
 
                 return requestGroupId.equals(groupId) && requestTopicId == 
topicId1 && requestPartition == partition1;
             },
-            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
 partition1, 0, 1, 1)),
+            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
 partition1, 0, 0, 1, 1)),
             coordinatorNode1);
 
         client.prepareResponseFrom(
@@ -881,7 +881,7 @@ class DefaultStatePersisterTest {
 
                 return requestGroupId.equals(groupId) && requestTopicId == 
topicId2 && requestPartition == partition2;
             },
-            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
 partition2, 0, 1, 1)),
+            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
 partition2, 0, 0, 1, 1)),
             coordinatorNode2);
 
         ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
@@ -931,12 +931,12 @@ class DefaultStatePersisterTest {
 
         HashSet<PartitionData> expectedResultMap = new HashSet<>();
         expectedResultMap.add(
-            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, 
Errors.NONE.code(),
+            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 0, 1, 
Errors.NONE.code(),
                 null
             ));
 
         expectedResultMap.add(
-            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, 
Errors.NONE.code(),
+            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 0, 1, 
Errors.NONE.code(),
                 null
             ));
 
@@ -1438,6 +1438,7 @@ class DefaultStatePersisterTest {
                             tp1.topicId(),
                             tp1.partition(),
                             1L,
+                            0,
                             1,
                             2
                         )
@@ -1470,7 +1471,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp1.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
1, Errors.NONE.code(), null))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
0, 1, Errors.NONE.code(), null))
                 )
             )
         );
@@ -1478,7 +1479,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp2.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 
-1, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
                 )
             )
         );
@@ -1498,6 +1499,7 @@ class DefaultStatePersisterTest {
                             tp1.topicId(),
                             tp1.partition(),
                             1L,
+                            0,
                             1,
                             2
                         )
@@ -1520,7 +1522,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp1.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
1, Errors.NONE.code(), null))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
0, 1, Errors.NONE.code(), null))
                 )
             )
         );
@@ -1528,7 +1530,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp2.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, 
-1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share 
coordinator: java.lang.Exception: scary stuff"))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, 
-1, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share 
coordinator: java.lang.Exception: scary stuff"))
                 )
             )
         );
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index db8819425d6..aa098090aef 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -445,6 +445,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 topicId,
                 partitionId,
                 PartitionFactory.UNINITIALIZED_START_OFFSET,
+                PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
                 PartitionFactory.DEFAULT_LEADER_EPOCH,
                 PartitionFactory.DEFAULT_STATE_EPOCH
             );
@@ -463,6 +464,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     topicId,
                     partitionId,
                     offsetValue.startOffset(),
+                    offsetValue.deliveryCompleteCount(),
                     offsetValue.leaderEpoch(),
                     offsetValue.stateEpoch()
                 );
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index f82843f2de5..aab870e4d8a 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -169,6 +169,7 @@ class ShareCoordinatorShardTest {
                 .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
                     .setPartition(PARTITION)
                     .setStartOffset(0)
+                    .setDeliveryCompleteCount(0)
                     .setStateEpoch(0)
                     .setLeaderEpoch(leaderEpoch)
                     .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
@@ -613,6 +614,7 @@ class ShareCoordinatorShardTest {
             PARTITION,
             0,
             0,
+            0,
             0
         ), result.response());
 

Reply via email to