This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c235305c98 Kafka Connect: Don't check that consumer group is stable
for coordinator leader election (#14395)
c235305c98 is described below
commit c235305c985e2f4427fa735a1a7460252da8005d
Author: Fenil Doshi <[email protected]>
AuthorDate: Tue Nov 4 11:27:52 2025 -0800
Kafka Connect: Don't check that consumer group is stable for coordinator
leader election (#14395)
---
.../iceberg/connect/channel/CommitterImpl.java | 16 +++---
.../iceberg/connect/channel/TestCommitterImpl.java | 67 ++++++++++++++++++++--
2 files changed, 70 insertions(+), 13 deletions(-)
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
index fcba5fb629..04602a66a5 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
@@ -29,7 +29,6 @@ import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -74,18 +73,19 @@ public class CommitterImpl implements Committer {
}
}
- private boolean hasLeaderPartition(Collection<TopicPartition>
currentAssignedPartitions) {
+ @VisibleForTesting
+ boolean hasLeaderPartition(Collection<TopicPartition>
currentAssignedPartitions) {
ConsumerGroupDescription groupDesc;
try (Admin admin = clientFactory.createAdmin()) {
groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(),
admin);
}
- if (groupDesc.state() == ConsumerGroupState.STABLE) {
- Collection<MemberDescription> members = groupDesc.members();
- if (containsFirstPartition(members, currentAssignedPartitions)) {
- membersWhenWorkerIsCoordinator = members;
- return true;
- }
+
+ Collection<MemberDescription> members = groupDesc.members();
+ if (containsFirstPartition(members, currentAssignedPartitions)) {
+ membersWhenWorkerIsCoordinator = members;
+ return true;
}
+
return false;
}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
index c8fcbab255..c6b7c86e4c 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
@@ -19,22 +19,55 @@
package org.apache.iceberg.connect.channel;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
+import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
public class TestCommitterImpl {
@Test
public void testIsLeader() {
+ MemberAssignment assignment1 =
+ new MemberAssignment(
+ ImmutableSet.of(new TopicPartition("topic1", 0), new
TopicPartition("topic2", 1)));
+ MemberDescription member1 =
+ new MemberDescription(null, Optional.empty(), null, null, assignment1);
+
+ MemberAssignment assignment2 =
+ new MemberAssignment(
+ ImmutableSet.of(new TopicPartition("topic2", 0), new
TopicPartition("topic1", 1)));
+ MemberDescription member2 =
+ new MemberDescription(null, Optional.empty(), null, null, assignment2);
+
+ List<MemberDescription> members = ImmutableList.of(member1, member2);
+
+ List<TopicPartition> leaderAssignments =
+ ImmutableList.of(new TopicPartition("topic2", 1), new
TopicPartition("topic1", 0));
+ List<TopicPartition> nonLeaderAssignments =
+ ImmutableList.of(new TopicPartition("topic2", 0), new
TopicPartition("topic1", 1));
+
CommitterImpl committer = new CommitterImpl();
+ assertThat(committer.containsFirstPartition(members,
leaderAssignments)).isTrue();
+ assertThat(committer.containsFirstPartition(members,
nonLeaderAssignments)).isFalse();
+ }
+ @Test
+ public void testHasLeaderPartition() throws NoSuchFieldException,
IllegalAccessException {
MemberAssignment assignment1 =
new MemberAssignment(
ImmutableSet.of(new TopicPartition("topic1", 0), new
TopicPartition("topic2", 1)));
@@ -49,12 +82,36 @@ public class TestCommitterImpl {
List<MemberDescription> members = ImmutableList.of(member1, member2);
- List<TopicPartition> assignments =
+ List<TopicPartition> leaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 1), new
TopicPartition("topic1", 0));
- assertThat(committer.containsFirstPartition(members,
assignments)).isTrue();
-
- assignments =
+ List<TopicPartition> nonLeaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 0), new
TopicPartition("topic1", 1));
- assertThat(committer.containsFirstPartition(members,
assignments)).isFalse();
+
+ CommitterImpl committer = new CommitterImpl();
+ Field configField = CommitterImpl.class.getDeclaredField("config");
+ Field clientFactoryField =
CommitterImpl.class.getDeclaredField("clientFactory");
+ configField.setAccessible(true);
+ clientFactoryField.setAccessible(true);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.connectGroupId()).thenReturn("test-group");
+ configField.set(committer, config);
+
+ KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
+ Admin admin = mock(Admin.class);
+ when(clientFactory.createAdmin()).thenReturn(admin);
+ clientFactoryField.set(committer, clientFactory);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupDescription(any(), any()))
+ .thenReturn(consumerGroupDescription);
+
+ when(consumerGroupDescription.members()).thenReturn(members);
+
+ assertThat(committer.hasLeaderPartition(leaderAssignments)).isTrue();
+ assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
+ }
}
}