dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571079283


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     *
+     * @param consumerGroup The ConsumerGroup.
+     * @param memberId      The fenced member id.
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = ClassicGroup.fromConsumerGroup(
+                consumerGroup,
+                leavingMemberId,
+                logContext,
+                time,
+                metrics,
+                consumerGroupSessionTimeoutMs,
+                metadataImage
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+                consumerGroup.groupId(), e.getMessage()));
+        }
+        
classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(),
 records);
+
+        removeGroup(consumerGroup.groupId());
+
+        groups.put(consumerGroup.groupId(), classicGroup);
+        metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+        classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+        prepareRebalance(classicGroup, String.format("Downgrade group %s.", 
classicGroup.groupId()));
+
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        appendFuture.whenComplete((__, t) -> {

Review Comment:
   I wonder if we could use `exceptionally` here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     *
+     * @param consumerGroup The ConsumerGroup.
+     * @param memberId      The fenced member id.
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = ClassicGroup.fromConsumerGroup(
+                consumerGroup,
+                leavingMemberId,
+                logContext,
+                time,
+                metrics,
+                consumerGroupSessionTimeoutMs,
+                metadataImage
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+                consumerGroup.groupId(), e.getMessage()));
+        }
+        
classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(),
 records);
+
+        removeGroup(consumerGroup.groupId());

Review Comment:
   Let's put a comment about this one. We should explain that we don't replay 
so we have to remove it ourselves.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     *
+     * @param consumerGroup The ConsumerGroup.
+     * @param memberId      The fenced member id.
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {

Review Comment:
   Could we make it private or package private?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10331,6 +10333,312 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
             .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
     }
 
+    @Test
+    public void testLastClassicProtocolMemberLeavingConsumerGroup() {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Arrays.asList(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                .setSupportedProtocols(protocols))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            // Use zar only here to ensure that metadata needs to be 
recomputed.
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        context.commit();
+        ConsumerGroup consumerGroup = 
context.groupMetadataManager.consumerGroup(groupId);
+
+        // Member 2 leaves the consumer group, triggering the downgrade.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+
+        byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1), new TopicPartition(fooTopicName, 2),
+            new TopicPartition(barTopicName, 0), new 
TopicPartition(barTopicName, 1)
+        ))));
+        Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
+            {
+                put(memberId1, assignment);
+            }
+        };
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            new LogContext(),
+            groupId,
+            STABLE,
+            context.time,
+            context.metrics,
+            10,
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.ofNullable("range"),
+            Optional.ofNullable(memberId1),
+            Optional.of(context.time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.ofNullable(member1.instanceId()),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                45000,
+                ConsumerProtocol.PROTOCOL_TYPE,
+                member1.supportedJoinGroupRequestProtocols(),
+                assignment
+            )
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting())
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+        verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
 null);
+        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
STABLE);
+
+        // The new classic member 1 has a heartbeat timeout.
+        ScheduledTimeout<Void, Record> heartbeatTimeout = 
context.timer.timeout(
+            classicGroupHeartbeatKey(groupId, memberId1));
+        assertNotNull(heartbeatTimeout);
+        // The new rebalance has a groupJoin timeout.
+        ScheduledTimeout<Void, Record> groupJoinTimeout = 
context.timer.timeout(
+            classicGroupJoinKey(groupId));
+        assertNotNull(groupJoinTimeout);
+
+        // A new rebalance is triggered.
+        ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+
+        // Simulate a failed write to the log.
+        result.appendFuture().completeExceptionally(new 
NotLeaderOrFollowerException());
+        context.rollback();
+
+        // The group is reverted back to the consumer group.
+        assertEquals(consumerGroup, 
context.groupMetadataManager.consumerGroup(groupId));
+        verify(context.metrics, 
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
+    }
+
+    @Test
+    public void testLastClassicProtocolMemberSessionTimeoutInConsumerGroup() {

Review Comment:
   Should we also add a test case for the rebalance timeout path?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##########
@@ -198,6 +204,62 @@ private static void assertApiMessageAndVersionEquals(
                     }
                 }
             }
+        } else if (actual.message() instanceof GroupMetadataValue) {

Review Comment:
   I understand that you replicated the pattern already in place. However, I 
don't like it even if I wrote it. Recently, I have been using a different 
approach which is better, I think.
   
   I do the following:
   1) I duplicate both messages (with `duplicate()` method)
   2) I normalize them in place (e.g. sort lists, etc.)
   3) I use `assertEquals` to compare them.
   
   The benefit of this approach is that it automatically includes new fields. 
Would it work here too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     *
+     * @param consumerGroup The ConsumerGroup.
+     * @param memberId      The fenced member id.
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {

Review Comment:
   Could we use  `numMembers() - numClassicProtocolMembers() <= 1`? I think 
that we know that the remaining member is the one using the consumer group 
protocol.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1440,9 +1508,20 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                 records = consumerGroupFenceMember(group, member);
             }
         }
-        return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-            .setMemberId(memberId)
-            .setMemberEpoch(memberEpoch));
+
+        CompletableFuture<Void> appendFuture = null;
+        if ((instanceId == null || memberEpoch != 
LEAVE_GROUP_STATIC_MEMBER_EPOCH) &&

Review Comment:
   Instead of repeating the conditions, could we use a boolean that we set in 
the relevant places?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     *
+     * @param consumerGroup The ConsumerGroup.
+     * @param memberId      The fenced member id.
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = ClassicGroup.fromConsumerGroup(
+                consumerGroup,
+                leavingMemberId,
+                logContext,
+                time,
+                metrics,
+                consumerGroupSessionTimeoutMs,
+                metadataImage
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+                consumerGroup.groupId(), e.getMessage()));
+        }
+        
classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(),
 records);

Review Comment:
   nit: `createClassicGroupRecords`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to