This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 754e924ab09 MINOR: Set unequal assignment epochs fallback to default 
epoch for streams group (#21655)
754e924ab09 is described below

commit 754e924ab090aa6a288755eb826337fa2c242ce9
Author: Lucy Liu <[email protected]>
AuthorDate: Tue Mar 10 05:33:00 2026 -0400

    MINOR: Set unequal assignment epochs fallback to default epoch for streams 
group (#21655)
    
    ## Summary
    
    This PR modifies the way of handling unequal assignmentEpoch and
    partition array length, from throwing an `IllegalStateException` to
    logging an error and then fallback to default member epoch.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  2 +-
 .../group/streams/StreamsGroupMember.java          |  8 +++++++-
 .../group/streams/TasksTupleWithEpochs.java        | 23 ++++++++++++----------
 .../group/streams/StreamsGroupMemberTest.java      |  6 +++++-
 .../group/streams/TasksTupleWithEpochsTest.java    | 22 ++++++++++++++++-----
 5 files changed, 43 insertions(+), 18 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a03e8900555..1967089ebd5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -5734,7 +5734,7 @@ public class GroupMetadataManager {
             StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
             StreamsGroupMember oldMember = 
streamsGroup.getOrCreateUninitializedMember(memberId);
             StreamsGroupMember newMember = new 
StreamsGroupMember.Builder(oldMember)
-                .updateWith(value)
+                .updateWith(log, groupId, value)
                 .build();
             streamsGroup.updateMember(newMember);
         } else {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 47e05e72688..887472132a6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -20,6 +20,8 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 
+import org.slf4j.Logger;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -249,12 +251,14 @@ public record StreamsGroupMember(String memberId,
             return this;
         }
 
-        public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue 
record) {
+        public Builder updateWith(Logger log, String groupId, 
StreamsGroupCurrentMemberAssignmentValue record) {
             setMemberEpoch(record.memberEpoch());
             setPreviousMemberEpoch(record.previousMemberEpoch());
             setState(MemberState.fromValue(record.state()));
             setAssignedTasks(
                 TasksTupleWithEpochs.fromCurrentAssignmentRecord(
+                    log,
+                    groupId,
                     record.activeTasks(),
                     record.standbyTasks(),
                     record.warmupTasks(),
@@ -263,6 +267,8 @@ public record StreamsGroupMember(String memberId,
             );
             setTasksPendingRevocation(
                 TasksTupleWithEpochs.fromCurrentAssignmentRecord(
+                    log,
+                    groupId,
                     record.activeTasksPendingRevocation(),
                     record.standbyTasksPendingRevocation(),
                     record.warmupTasksPendingRevocation(),
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
index e5d726b0e64..99220d466bc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
@@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.streams;
 
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 
+import org.slf4j.Logger;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -102,13 +104,15 @@ public record TasksTupleWithEpochs(Map<String, 
Map<Integer, Integer>> activeTask
      * @return The TasksTupleWithEpochs
      */
     public static TasksTupleWithEpochs fromCurrentAssignmentRecord(
+        Logger log,
+        String groupId,
         List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> activeTasks,
         List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> standbyTasks,
         List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> warmupTasks,
         int memberEpoch
     ) {
         return new TasksTupleWithEpochs(
-            parseActiveTasksWithEpochs(activeTasks, memberEpoch),
+            parseActiveTasksWithEpochs(log, groupId, activeTasks, memberEpoch),
             parseSimpleTasks(standbyTasks),
             parseSimpleTasks(warmupTasks)
         );
@@ -125,6 +129,8 @@ public record TasksTupleWithEpochs(Map<String, Map<Integer, 
Integer>> activeTask
     }
 
     private static Map<String, Map<Integer, Integer>> 
parseActiveTasksWithEpochs(
+        Logger log,
+        String groupId,
         List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIdsList,
         int memberEpoch
     ) {
@@ -137,19 +143,16 @@ public record TasksTupleWithEpochs(Map<String, 
Map<Integer, Integer>> activeTask
 
             Map<Integer, Integer> partitionsWithEpochs = new HashMap<>();
 
-            if (epochs != null && !epochs.isEmpty()) {
-                if (epochs.size() != partitions.size()) {
-                    throw new IllegalStateException(
-                        "Assignment epochs must be provided for all 
partitions. " +
-                        "Subtopology " + subtopologyId + " has " + 
partitions.size() +
-                        " partitions but " + epochs.size() + " epochs"
-                    );
-                }
-
+            if (epochs != null && epochs.size() == partitions.size()) {
                 for (int i = 0; i < partitions.size(); i++) {
                     partitionsWithEpochs.put(partitions.get(i), epochs.get(i));
                 }
             } else {
+                if (epochs != null) {
+                    log.error("[GroupId {}] Size of assignment epochs {} is 
not equal to partitions {} for subtopology {}. " +
+                            "Using default epoch {} for all partitions.",
+                        groupId, epochs.size(), partitions.size(), 
subtopologyId, memberEpoch);
+                }
                 // Legacy record without epochs: use member epoch as default
                 for (Integer partition : partitions) {
                     partitionsWithEpochs.put(partition, memberEpoch);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 5fd33272857..d332ffb632a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -23,6 +23,8 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue;
 
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -46,6 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class StreamsGroupMemberTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamsGroupMemberTest.class);
+    private static final String GROUP_ID = "test-group";
     private static final String MEMBER_ID = "member-id";
     private static final int MEMBER_EPOCH = 10;
     private static final int PREVIOUS_MEMBER_EPOCH = 9;
@@ -226,7 +230,7 @@ public class StreamsGroupMemberTest {
             .setWarmupTasksPendingRevocation(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6)));
 
         StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
-            .updateWith(record)
+            .updateWith(LOG, GROUP_ID, record)
             .build();
 
         assertEquals(MEMBER_ID, member.memberId());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
index f92a23847fa..d31db67af7a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TasksTupleWithEpochsTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TasksTupleWithEpochs.class);
+    private static final String GROUP_ID = "test-group";
     private static final String SUBTOPOLOGY_1 = "1";
     private static final String SUBTOPOLOGY_2 = "2";
     private static final String SUBTOPOLOGY_3 = "3";
@@ -97,7 +102,7 @@ public class TasksTupleWithEpochsTest {
             .setPartitions(Arrays.asList(7, 8, 9)));
 
         TasksTupleWithEpochs tuple = 
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
-            activeTasks, standbyTasks, warmupTasks, 100
+            LOG, GROUP_ID, activeTasks, standbyTasks, warmupTasks, 100
         );
 
         assertEquals(
@@ -133,7 +138,7 @@ public class TasksTupleWithEpochsTest {
 
         int memberEpoch = 100;
         TasksTupleWithEpochs tuple = 
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
-            activeTasks, List.of(), List.of(), memberEpoch
+            LOG, GROUP_ID, activeTasks, List.of(), List.of(), memberEpoch
         );
 
         // Should use member epoch as default
@@ -152,9 +157,16 @@ public class TasksTupleWithEpochsTest {
             .setPartitions(Arrays.asList(1, 2, 3))
             .setAssignmentEpochs(Arrays.asList(10, 11))); // Only 2 epochs for 
3 partitions
 
-        assertThrows(IllegalStateException.class, () ->
-            TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks, 
List.of(), List.of(), 100)
-        );
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TasksTupleWithEpochs.class)) {
+            TasksTupleWithEpochs tuple = 
TasksTupleWithEpochs.fromCurrentAssignmentRecord(LOG, GROUP_ID, activeTasks, 
List.of(), List.of(), 100);
+            assertEquals(
+                Map.of(SUBTOPOLOGY_1, Map.of(1, 100, 2, 100, 3, 100)),
+                tuple.activeTasksWithEpochs()
+            );
+            assertEquals(1, appender.getMessages("ERROR").stream()
+                .filter(msg -> msg.contains("[GroupId " + GROUP_ID + "] Size 
of assignment epochs 2 is not equal to partitions 3 for subtopology 1."))
+                .count());
+        }
     }
 
     @Test

Reply via email to